From 65c9d6e9b9a89806c3bdc67ed3948aeb125f364a Mon Sep 17 00:00:00 2001 From: "Jean Marchand (Exotic Markets)" Date: Tue, 6 Jun 2023 09:43:46 +0200 Subject: [PATCH] client: Add async to anchor-client (#2488) Co-authored-by: acheron --- CHANGELOG.md | 1 + Cargo.lock | 2 + client/Cargo.toml | 3 + client/example/Cargo.toml | 4 + client/example/run-test.sh | 24 ++ client/example/src/blocking.rs | 329 +++++++++++++++++ client/example/src/main.rs | 326 +---------------- client/example/src/nonblocking.rs | 306 ++++++++++++++++ client/src/blocking.rs | 109 ++++++ client/src/lib.rs | 334 ++++++++++-------- client/src/nonblocking.rs | 102 ++++++ tests/zero-copy/programs/zero-copy/Cargo.toml | 2 +- .../zero-copy/tests/compute_unit_test.rs | 2 +- 13 files changed, 1090 insertions(+), 454 deletions(-) create mode 100644 client/example/src/blocking.rs create mode 100644 client/example/src/nonblocking.rs create mode 100644 client/src/blocking.rs create mode 100644 client/src/nonblocking.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 4fd4560e20..a999c5de6f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ The minor version will be incremented upon a breaking change and the patch versi ### Features +- client: Add `async` feature flag to use an asynchronous anchor-client ([#2488](https://github.com/coral-xyz/anchor/pull/2488)). - spl: Add metadata wrappers `approve_collection_authority`, `bubblegum_set_collection_size`, `burn_edition_nft`, `burn_nft`, `revoke_collection_authority`, `set_token_standard`, `utilize`, `unverify_sized_collection_item`, `unverify_collection` ([#2430](https://github.com/coral-xyz/anchor/pull/2430)) - spl: Add `token_program` constraint to `Token`, `Mint`, and `AssociatedToken` accounts in order to override required `token_program` fields and use different token interface implementations in the same instruction ([#2460](https://github.com/coral-xyz/anchor/pull/2460)) - cli: Add support for Solidity programs. `anchor init` and `anchor new` take an option `--solidity` which creates solidity code rather than rust. `anchor build` and `anchor test` work accordingly ([#2421](https://github.com/coral-xyz/anchor/pull/2421)) diff --git a/Cargo.lock b/Cargo.lock index cd90b599e0..9485d041cc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -216,12 +216,14 @@ version = "0.27.0" dependencies = [ "anchor-lang", "anyhow", + "futures", "regex", "serde", "solana-account-decoder", "solana-client", "solana-sdk", "thiserror", + "tokio", "url", ] diff --git a/client/Cargo.toml b/client/Cargo.toml index b4aed6db20..edfd7ad6ad 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -9,6 +9,7 @@ description = "Rust client for Anchor programs" [features] debug = [] +async = [] [dependencies] anchor-lang = { path = "../lang", version = "0.27.0" } @@ -20,3 +21,5 @@ solana-sdk = "<1.17.0" solana-account-decoder = "<1.17.0" thiserror = "1.0.20" url = "2.2.2" +tokio = { version = "1", features = ["rt", "sync"] } +futures = { version = "0.3.28" } \ No newline at end of file diff --git a/client/example/Cargo.toml b/client/example/Cargo.toml index 157b80e01b..52dd1da065 100644 --- a/client/example/Cargo.toml +++ b/client/example/Cargo.toml @@ -7,6 +7,9 @@ edition = "2021" [workspace] +[features] +async = ["anchor-client/async"] + [dependencies] anchor-client = { path = "../", features = ["debug"] } basic-2 = { path = "../../examples/tutorial/basic-2/programs/basic-2", features = ["no-entrypoint"] } @@ -17,4 +20,5 @@ events = { path = "../../tests/events/programs/events", features = ["no-entrypoi shellexpand = "2.1.0" anyhow = "1.0.32" clap = { version = "4.2.4", features = ["derive"] } +tokio = { version = "1", features = ["full"] } solana-sdk = "<1.17.0" diff --git a/client/example/run-test.sh b/client/example/run-test.sh index 4264cdf78e..13f3e47f8b 100755 --- a/client/example/run-test.sh +++ b/client/example/run-test.sh @@ -74,6 +74,30 @@ main() { --optional-pid $optional_pid \ --multithreaded + # + # Restart validator for async test + # + cleanup + solana-test-validator -r \ + --bpf-program $composite_pid ../../tests/composite/target/deploy/composite.so \ + --bpf-program $basic_2_pid ../../examples/tutorial/basic-2/target/deploy/basic_2.so \ + --bpf-program $basic_4_pid ../../examples/tutorial/basic-4/target/deploy/basic_4.so \ + --bpf-program $events_pid ../../tests/events/target/deploy/events.so \ + --bpf-program $optional_pid ../../tests/optional/target/deploy/optional.so \ + > test-validator.log & + sleep 5 + + # + # Run async test. + # + cargo run --features async -- \ + --composite-pid $composite_pid \ + --basic-2-pid $basic_2_pid \ + --basic-4-pid $basic_4_pid \ + --events-pid $events_pid \ + --optional-pid $optional_pid \ + --multithreaded + } cleanup() { diff --git a/client/example/src/blocking.rs b/client/example/src/blocking.rs new file mode 100644 index 0000000000..eaf7a01b28 --- /dev/null +++ b/client/example/src/blocking.rs @@ -0,0 +1,329 @@ +use anchor_client::solana_sdk::pubkey::Pubkey; +use anchor_client::solana_sdk::signature::{Keypair, Signer}; +use anchor_client::solana_sdk::system_instruction; +use anchor_client::{Client, Cluster}; +use anyhow::Result; +use clap::Parser; +use solana_sdk::commitment_config::CommitmentConfig; +use solana_sdk::signature::read_keypair_file; +use solana_sdk::system_program; +// The `accounts` and `instructions` modules are generated by the framework. +use basic_2::accounts as basic_2_accounts; +use basic_2::instruction as basic_2_instruction; +use basic_2::Counter; +use events::instruction as events_instruction; +use events::MyEvent; +use optional::accounts::Initialize as OptionalInitialize; +use optional::instruction as optional_instruction; +// The `accounts` and `instructions` modules are generated by the framework. +use basic_4::accounts as basic_4_accounts; +use basic_4::instruction as basic_4_instruction; +use basic_4::Counter as CounterAccount; +// The `accounts` and `instructions` modules are generated by the framework. +use crate::Opts; +use composite::accounts::{Bar, CompositeUpdate, Foo, Initialize}; +use composite::instruction as composite_instruction; +use composite::{DummyA, DummyB}; +use optional::account::{DataAccount, DataPda}; +use std::ops::Deref; +use std::rc::Rc; +use std::sync::Arc; +use std::thread::sleep; +use std::time::Duration; + +type TestFn = &'static (dyn Fn(&Client, Pubkey) -> Result<()> + Send + Sync); + +pub fn main() -> Result<()> { + let opts = Opts::parse(); + + // Wallet and cluster params. + let payer = read_keypair_file(&*shellexpand::tilde("~/.config/solana/id.json")) + .expect("Example requires a keypair file"); + let url = Cluster::Custom( + "http://localhost:8899".to_string(), + "ws://127.0.0.1:8900".to_string(), + ); + + if !opts.multithreaded { + // Client. + let payer = Rc::new(payer); + let client = + Client::new_with_options(url.clone(), payer.clone(), CommitmentConfig::processed()); + + // Run tests on single thread with a single client using an Rc + println!("\nStarting single thread test..."); + composite(&client, opts.composite_pid)?; + basic_2(&client, opts.basic_2_pid)?; + basic_4(&client, opts.basic_4_pid)?; + + // Can also use references, since they deref to a signer + let payer: &Keypair = &payer; + let client = Client::new_with_options(url, payer, CommitmentConfig::processed()); + events(&client, opts.events_pid)?; + optional(&client, opts.optional_pid)?; + } else { + // Client. + let payer = Arc::new(payer); + let client = Client::new_with_options(url, payer, CommitmentConfig::processed()); + let client = Arc::new(client); + + // Run tests multithreaded while sharing a client + println!("\nStarting multithread test..."); + let client = Arc::new(client); + let tests: Vec<(TestFn>, Pubkey)> = vec![ + (&composite, opts.composite_pid), + (&basic_2, opts.basic_2_pid), + (&basic_4, opts.basic_4_pid), + (&events, opts.events_pid), + (&optional, opts.optional_pid), + ]; + let mut handles = vec![]; + for (test, arg) in tests { + let local_client = Arc::clone(&client); + handles.push(std::thread::spawn(move || test(&local_client, arg))); + } + for handle in handles { + assert!(handle.join().unwrap().is_ok()); + } + } + + // Success. + Ok(()) +} + +// Runs a client for examples/tutorial/composite. +// +// Make sure to run a localnet with the program deploy to run this example. +pub fn composite + Clone>( + client: &Client, + pid: Pubkey, +) -> Result<()> { + // Program client. + let program = client.program(pid)?; + + // `Initialize` parameters. + let dummy_a = Keypair::new(); + let dummy_b = Keypair::new(); + + // Build and send a transaction. + program + .request() + .instruction(system_instruction::create_account( + &program.payer(), + &dummy_a.pubkey(), + program.rpc().get_minimum_balance_for_rent_exemption(500)?, + 500, + &program.id(), + )) + .instruction(system_instruction::create_account( + &program.payer(), + &dummy_b.pubkey(), + program.rpc().get_minimum_balance_for_rent_exemption(500)?, + 500, + &program.id(), + )) + .signer(&dummy_a) + .signer(&dummy_b) + .accounts(Initialize { + dummy_a: dummy_a.pubkey(), + dummy_b: dummy_b.pubkey(), + }) + .args(composite_instruction::Initialize) + .send()?; + + // Assert the transaction worked. + let dummy_a_account: DummyA = program.account(dummy_a.pubkey())?; + let dummy_b_account: DummyB = program.account(dummy_b.pubkey())?; + assert_eq!(dummy_a_account.data, 0); + assert_eq!(dummy_b_account.data, 0); + + // Build and send another transaction, using composite account parameters. + program + .request() + .accounts(CompositeUpdate { + foo: Foo { + dummy_a: dummy_a.pubkey(), + }, + bar: Bar { + dummy_b: dummy_b.pubkey(), + }, + }) + .args(composite_instruction::CompositeUpdate { + dummy_a: 1234, + dummy_b: 4321, + }) + .send()?; + + // Assert the transaction worked. + let dummy_a_account: DummyA = program.account(dummy_a.pubkey())?; + let dummy_b_account: DummyB = program.account(dummy_b.pubkey())?; + assert_eq!(dummy_a_account.data, 1234); + assert_eq!(dummy_b_account.data, 4321); + + println!("Composite success!"); + + Ok(()) +} + +// Runs a client for examples/tutorial/basic-2. +// +// Make sure to run a localnet with the program deploy to run this example. +pub fn basic_2 + Clone>( + client: &Client, + pid: Pubkey, +) -> Result<()> { + let program = client.program(pid)?; + + // `Create` parameters. + let counter = Keypair::new(); + let authority = program.payer(); + + // Build and send a transaction. + program + .request() + .signer(&counter) + .accounts(basic_2_accounts::Create { + counter: counter.pubkey(), + user: authority, + system_program: system_program::ID, + }) + .args(basic_2_instruction::Create { authority }) + .send()?; + + let counter_account: Counter = program.account(counter.pubkey())?; + + assert_eq!(counter_account.authority, authority); + assert_eq!(counter_account.count, 0); + + println!("Basic 2 success!"); + + Ok(()) +} + +pub fn events + Clone>( + client: &Client, + pid: Pubkey, +) -> Result<()> { + let program = client.program(pid)?; + + let (sender, receiver) = std::sync::mpsc::channel(); + let event_unsubscriber = program.on(move |_, event: MyEvent| { + if sender.send(event).is_err() { + println!("Error while transferring the event.") + } + })?; + + sleep(Duration::from_millis(1000)); + + program + .request() + .args(events_instruction::Initialize {}) + .send()?; + + let event = receiver.recv().unwrap(); + assert_eq!(event.data, 5); + assert_eq!(event.label, "hello".to_string()); + + event_unsubscriber.unsubscribe(); + + println!("Events success!"); + + Ok(()) +} + +pub fn basic_4 + Clone>( + client: &Client, + pid: Pubkey, +) -> Result<()> { + let program = client.program(pid)?; + let authority = program.payer(); + let (counter, _) = Pubkey::find_program_address(&[b"counter"], &pid); + + program + .request() + .accounts(basic_4_accounts::Initialize { + counter, + authority, + system_program: system_program::ID, + }) + .args(basic_4_instruction::Initialize {}) + .send()?; + let counter_account: CounterAccount = program.account(counter)?; + assert_eq!(counter_account.authority, authority); + assert_eq!(counter_account.count, 0); + + program + .request() + .accounts(basic_4_accounts::Increment { counter, authority }) + .args(basic_4_instruction::Increment {}) + .send()?; + + let counter_account: CounterAccount = program.account(counter)?; + assert_eq!(counter_account.authority, authority); + assert_eq!(counter_account.count, 1); + + println!("Basic 4 success!"); + + Ok(()) +} + +// Runs a client for tests/optional. +// +// Make sure to run a localnet with the program deploy to run this example. +pub fn optional + Clone>( + client: &Client, + pid: Pubkey, +) -> Result<()> { + // Program client. + let program = client.program(pid)?; + + // `Initialize` parameters. + let data_account_keypair = Keypair::new(); + + let data_account_key = data_account_keypair.pubkey(); + + let data_pda_seeds = &[DataPda::PREFIX.as_ref(), data_account_key.as_ref()]; + let data_pda_key = Pubkey::find_program_address(data_pda_seeds, &pid).0; + let required_keypair = Keypair::new(); + let value: u64 = 10; + + // Build and send a transaction. + + program + .request() + .instruction(system_instruction::create_account( + &program.payer(), + &required_keypair.pubkey(), + program + .rpc() + .get_minimum_balance_for_rent_exemption(DataAccount::LEN)?, + DataAccount::LEN as u64, + &program.id(), + )) + .signer(&data_account_keypair) + .signer(&required_keypair) + .accounts(OptionalInitialize { + payer: Some(program.payer()), + required: required_keypair.pubkey(), + system_program: Some(system_program::id()), + optional_account: Some(data_account_keypair.pubkey()), + optional_pda: None, + }) + .args(optional_instruction::Initialize { value, key: pid }) + .send() + .unwrap(); + + // Assert the transaction worked. + let required: DataAccount = program.account(required_keypair.pubkey())?; + assert_eq!(required.data, 0); + + let optional_pda = program.account::(data_pda_key); + assert!(optional_pda.is_err()); + + let optional_account: DataAccount = program.account(data_account_keypair.pubkey())?; + assert_eq!(optional_account.data, value * 2); + + println!("Optional success!"); + + Ok(()) +} diff --git a/client/example/src/main.rs b/client/example/src/main.rs index 770a9a894f..f13e76c7d1 100644 --- a/client/example/src/main.rs +++ b/client/example/src/main.rs @@ -1,33 +1,12 @@ -use anchor_client::solana_sdk::commitment_config::CommitmentConfig; use anchor_client::solana_sdk::pubkey::Pubkey; -use anchor_client::solana_sdk::signature::read_keypair_file; -use anchor_client::solana_sdk::signature::{Keypair, Signer}; -use anchor_client::solana_sdk::system_instruction; -use anchor_client::{Client, Cluster, EventContext}; use anyhow::Result; -use solana_sdk::system_program; -// The `accounts` and `instructions` modules are generated by the framework. -use basic_2::accounts as basic_2_accounts; -use basic_2::instruction as basic_2_instruction; -use basic_2::Counter; -use events::instruction as events_instruction; -use events::MyEvent; -use optional::accounts::Initialize as OptionalInitialize; -use optional::instruction as optional_instruction; -// The `accounts` and `instructions` modules are generated by the framework. -use basic_4::accounts as basic_4_accounts; -use basic_4::instruction as basic_4_instruction; -use basic_4::Counter as CounterAccount; use clap::Parser; -// The `accounts` and `instructions` modules are generated by the framework. -use composite::accounts::{Bar, CompositeUpdate, Foo, Initialize}; -use composite::instruction as composite_instruction; -use composite::{DummyA, DummyB}; -use optional::account::{DataAccount, DataPda}; -use std::ops::Deref; -use std::rc::Rc; -use std::sync::Arc; -use std::time::Duration; + +#[cfg(not(feature = "async"))] +mod blocking; + +#[cfg(feature = "async")] +mod nonblocking; #[derive(Parser, Debug)] pub struct Opts { @@ -45,296 +24,15 @@ pub struct Opts { multithreaded: bool, } -type TestFn = &'static (dyn Fn(&Client, Pubkey) -> Result<()> + Send + Sync); - // This example assumes a local validator is running with the programs // deployed at the addresses given by the CLI args. +#[cfg(not(feature = "async"))] fn main() -> Result<()> { - let opts = Opts::parse(); - - // Wallet and cluster params. - let payer = read_keypair_file(&*shellexpand::tilde("~/.config/solana/id.json")) - .expect("Example requires a keypair file"); - let url = Cluster::Custom( - "http://localhost:8899".to_string(), - "ws://127.0.0.1:8900".to_string(), - ); - - if !opts.multithreaded { - // Client. - let payer = Rc::new(payer); - let client = - Client::new_with_options(url.clone(), payer.clone(), CommitmentConfig::processed()); - - // Run tests on single thread with a single client using an Rc - println!("\nStarting single thread test..."); - composite(&client, opts.composite_pid)?; - basic_2(&client, opts.basic_2_pid)?; - basic_4(&client, opts.basic_4_pid)?; - - // Can also use references, since they deref to a signer - let payer: &Keypair = &payer; - let client = Client::new_with_options(url, payer, CommitmentConfig::processed()); - events(&client, opts.events_pid)?; - optional(&client, opts.optional_pid)?; - } else { - // Client. - let payer = Arc::new(payer); - let client = Client::new_with_options(url, payer, CommitmentConfig::processed()); - let client = Arc::new(client); - - // Run tests multithreaded while sharing a client - println!("\nStarting multithread test..."); - let client = Arc::new(client); - let tests: Vec<(TestFn>, Pubkey)> = vec![ - (&composite, opts.composite_pid), - (&basic_2, opts.basic_2_pid), - (&basic_4, opts.basic_4_pid), - (&events, opts.events_pid), - (&optional, opts.optional_pid), - ]; - let mut handles = vec![]; - for (test, arg) in tests { - let local_client = Arc::clone(&client); - handles.push(std::thread::spawn(move || test(&local_client, arg))); - } - for handle in handles { - assert!(handle.join().unwrap().is_ok()); - } - } - - // Success. - Ok(()) -} - -// Runs a client for examples/tutorial/composite. -// -// Make sure to run a localnet with the program deploy to run this example. -fn composite + Clone>( - client: &Client, - pid: Pubkey, -) -> Result<()> { - // Program client. - let program = client.program(pid); - - // `Initialize` parameters. - let dummy_a = Keypair::new(); - let dummy_b = Keypair::new(); - - // Build and send a transaction. - program - .request() - .instruction(system_instruction::create_account( - &program.payer(), - &dummy_a.pubkey(), - program.rpc().get_minimum_balance_for_rent_exemption(500)?, - 500, - &program.id(), - )) - .instruction(system_instruction::create_account( - &program.payer(), - &dummy_b.pubkey(), - program.rpc().get_minimum_balance_for_rent_exemption(500)?, - 500, - &program.id(), - )) - .signer(&dummy_a) - .signer(&dummy_b) - .accounts(Initialize { - dummy_a: dummy_a.pubkey(), - dummy_b: dummy_b.pubkey(), - }) - .args(composite_instruction::Initialize) - .send()?; - - // Assert the transaction worked. - let dummy_a_account: DummyA = program.account(dummy_a.pubkey())?; - let dummy_b_account: DummyB = program.account(dummy_b.pubkey())?; - assert_eq!(dummy_a_account.data, 0); - assert_eq!(dummy_b_account.data, 0); - - // Build and send another transaction, using composite account parameters. - program - .request() - .accounts(CompositeUpdate { - foo: Foo { - dummy_a: dummy_a.pubkey(), - }, - bar: Bar { - dummy_b: dummy_b.pubkey(), - }, - }) - .args(composite_instruction::CompositeUpdate { - dummy_a: 1234, - dummy_b: 4321, - }) - .send()?; - - // Assert the transaction worked. - let dummy_a_account: DummyA = program.account(dummy_a.pubkey())?; - let dummy_b_account: DummyB = program.account(dummy_b.pubkey())?; - assert_eq!(dummy_a_account.data, 1234); - assert_eq!(dummy_b_account.data, 4321); - - println!("Composite success!"); - - Ok(()) -} - -// Runs a client for examples/tutorial/basic-2. -// -// Make sure to run a localnet with the program deploy to run this example. -fn basic_2 + Clone>(client: &Client, pid: Pubkey) -> Result<()> { - let program = client.program(pid); - - // `Create` parameters. - let counter = Keypair::new(); - let authority = program.payer(); - - // Build and send a transaction. - program - .request() - .signer(&counter) - .accounts(basic_2_accounts::Create { - counter: counter.pubkey(), - user: authority, - system_program: system_program::ID, - }) - .args(basic_2_instruction::Create { authority }) - .send()?; - - let counter_account: Counter = program.account(counter.pubkey())?; - - assert_eq!(counter_account.authority, authority); - assert_eq!(counter_account.count, 0); - - println!("Basic 2 success!"); - - Ok(()) + blocking::main() } -fn events + Clone>(client: &Client, pid: Pubkey) -> Result<()> { - let program = client.program(pid); - - let (sender, receiver) = std::sync::mpsc::channel(); - let handle = program.on(move |_ctx: &EventContext, event: MyEvent| { - sender.send(event).unwrap(); - })?; - - std::thread::sleep(Duration::from_millis(1000)); - - program - .request() - .args(events_instruction::Initialize {}) - .send()?; - - let event = receiver.recv().unwrap(); - assert_eq!(event.data, 5); - assert_eq!(event.label, "hello".to_string()); - - // TODO: remove once https://github.com/solana-labs/solana/issues/16102 - // is addressed. Until then, drop the subscription handle in another - // thread so that we deadlock in the other thread as to not block - // this thread. - std::thread::spawn(move || { - drop(handle); - }); - - println!("Events success!"); - - Ok(()) -} - -pub fn basic_4 + Clone>( - client: &Client, - pid: Pubkey, -) -> Result<()> { - let program = client.program(pid); - let authority = program.payer(); - let (counter, _) = Pubkey::find_program_address(&[b"counter"], &pid); - - program - .request() - .accounts(basic_4_accounts::Initialize { - counter, - authority, - system_program: system_program::ID, - }) - .args(basic_4_instruction::Initialize {}) - .send()?; - let counter_account: CounterAccount = program.account(counter)?; - assert_eq!(counter_account.authority, authority); - assert_eq!(counter_account.count, 0); - - program - .request() - .accounts(basic_4_accounts::Increment { counter, authority }) - .args(basic_4_instruction::Increment {}) - .send()?; - - let counter_account: CounterAccount = program.account(counter)?; - assert_eq!(counter_account.authority, authority); - assert_eq!(counter_account.count, 1); - - println!("Basic 4 success!"); - - Ok(()) -} - -// Runs a client for tests/optional. -// -// Make sure to run a localnet with the program deploy to run this example. -fn optional + Clone>(client: &Client, pid: Pubkey) -> Result<()> { - // Program client. - let program = client.program(pid); - - // `Initialize` parameters. - let data_account_keypair = Keypair::new(); - - let data_account_key = data_account_keypair.pubkey(); - - let data_pda_seeds = &[DataPda::PREFIX.as_ref(), data_account_key.as_ref()]; - let data_pda_key = Pubkey::find_program_address(data_pda_seeds, &pid).0; - let required_keypair = Keypair::new(); - let value: u64 = 10; - - // Build and send a transaction. - - program - .request() - .instruction(system_instruction::create_account( - &program.payer(), - &required_keypair.pubkey(), - program - .rpc() - .get_minimum_balance_for_rent_exemption(DataAccount::LEN)?, - DataAccount::LEN as u64, - &program.id(), - )) - .signer(&data_account_keypair) - .signer(&required_keypair) - .accounts(OptionalInitialize { - payer: Some(program.payer()), - required: required_keypair.pubkey(), - system_program: Some(system_program::id()), - optional_account: Some(data_account_keypair.pubkey()), - optional_pda: None, - }) - .args(optional_instruction::Initialize { value, key: pid }) - .send() - .unwrap(); - - // Assert the transaction worked. - let required: DataAccount = program.account(required_keypair.pubkey())?; - assert_eq!(required.data, 0); - - let optional_pda = program.account::(data_pda_key); - assert!(optional_pda.is_err()); - - let optional_account: DataAccount = program.account(data_account_keypair.pubkey())?; - assert_eq!(optional_account.data, value * 2); - - println!("Optional success!"); - - Ok(()) +#[cfg(feature = "async")] +#[tokio::main] +async fn main() -> Result<()> { + nonblocking::main().await } diff --git a/client/example/src/nonblocking.rs b/client/example/src/nonblocking.rs new file mode 100644 index 0000000000..eeee5d4f18 --- /dev/null +++ b/client/example/src/nonblocking.rs @@ -0,0 +1,306 @@ +use anchor_client::solana_sdk::pubkey::Pubkey; +use anchor_client::solana_sdk::signature::{Keypair, Signer}; +use anchor_client::solana_sdk::system_instruction; +use anchor_client::{Client, Cluster}; +use anyhow::Result; +use clap::Parser; +use solana_sdk::commitment_config::CommitmentConfig; +use solana_sdk::signature::read_keypair_file; +use solana_sdk::system_program; +// The `accounts` and `instructions` modules are generated by the framework. +use basic_2::accounts as basic_2_accounts; +use basic_2::instruction as basic_2_instruction; +use basic_2::Counter; +use events::instruction as events_instruction; +use events::MyEvent; +use optional::accounts::Initialize as OptionalInitialize; +use optional::instruction as optional_instruction; +// The `accounts` and `instructions` modules are generated by the framework. +use basic_4::accounts as basic_4_accounts; +use basic_4::instruction as basic_4_instruction; +use basic_4::Counter as CounterAccount; +// The `accounts` and `instructions` modules are generated by the framework. +use crate::Opts; +use composite::accounts::{Bar, CompositeUpdate, Foo, Initialize}; +use composite::instruction as composite_instruction; +use composite::{DummyA, DummyB}; +use optional::account::{DataAccount, DataPda}; +use std::ops::Deref; +use std::rc::Rc; +use std::time::Duration; +use tokio::sync::mpsc; +use tokio::time::sleep; + +pub async fn main() -> Result<()> { + let opts = Opts::parse(); + + // Wallet and cluster params. + let payer = read_keypair_file(&*shellexpand::tilde("~/.config/solana/id.json")) + .expect("Example requires a keypair file"); + let url = Cluster::Custom( + "http://localhost:8899".to_string(), + "ws://127.0.0.1:8900".to_string(), + ); + + // Client. + let payer = Rc::new(payer); + let client = + Client::new_with_options(url.clone(), payer.clone(), CommitmentConfig::processed()); + + println!("\nStarting async test..."); + composite(&client, opts.composite_pid).await?; + basic_2(&client, opts.basic_2_pid).await?; + basic_4(&client, opts.basic_4_pid).await?; + + // Can also use references, since they deref to a signer + let payer: &Keypair = &payer; + let client = Client::new_with_options(url, payer, CommitmentConfig::processed()); + events(&client, opts.events_pid).await?; + optional(&client, opts.optional_pid).await?; + // Success. + Ok(()) +} + +pub async fn composite + Clone>( + client: &Client, + pid: Pubkey, +) -> Result<()> { + // Program client. + let program = client.program(pid)?; + + // `Initialize` parameters. + let dummy_a = Keypair::new(); + let dummy_b = Keypair::new(); + + // Build and send a transaction. + program + .request() + .instruction(system_instruction::create_account( + &program.payer(), + &dummy_a.pubkey(), + program + .async_rpc() + .get_minimum_balance_for_rent_exemption(500) + .await?, + 500, + &program.id(), + )) + .instruction(system_instruction::create_account( + &program.payer(), + &dummy_b.pubkey(), + program + .async_rpc() + .get_minimum_balance_for_rent_exemption(500) + .await?, + 500, + &program.id(), + )) + .signer(&dummy_a) + .signer(&dummy_b) + .accounts(Initialize { + dummy_a: dummy_a.pubkey(), + dummy_b: dummy_b.pubkey(), + }) + .args(composite_instruction::Initialize) + .send() + .await?; + + // Assert the transaction worked. + let dummy_a_account: DummyA = program.account(dummy_a.pubkey()).await?; + let dummy_b_account: DummyB = program.account(dummy_b.pubkey()).await?; + assert_eq!(dummy_a_account.data, 0); + assert_eq!(dummy_b_account.data, 0); + + // Build and send another transaction, using composite account parameters. + program + .request() + .accounts(CompositeUpdate { + foo: Foo { + dummy_a: dummy_a.pubkey(), + }, + bar: Bar { + dummy_b: dummy_b.pubkey(), + }, + }) + .args(composite_instruction::CompositeUpdate { + dummy_a: 1234, + dummy_b: 4321, + }) + .send() + .await?; + + // Assert the transaction worked. + let dummy_a_account: DummyA = program.account(dummy_a.pubkey()).await?; + let dummy_b_account: DummyB = program.account(dummy_b.pubkey()).await?; + assert_eq!(dummy_a_account.data, 1234); + assert_eq!(dummy_b_account.data, 4321); + + println!("Composite success!"); + + Ok(()) +} + +pub async fn basic_2 + Clone>( + client: &Client, + pid: Pubkey, +) -> Result<()> { + let program = client.program(pid)?; + + // `Create` parameters. + let counter = Keypair::new(); + let authority = program.payer(); + + // Build and send a transaction. + program + .request() + .signer(&counter) + .accounts(basic_2_accounts::Create { + counter: counter.pubkey(), + user: authority, + system_program: system_program::ID, + }) + .args(basic_2_instruction::Create { authority }) + .send() + .await?; + + let counter_account: Counter = program.account(counter.pubkey()).await?; + + assert_eq!(counter_account.authority, authority); + assert_eq!(counter_account.count, 0); + + println!("Basic 2 success!"); + + Ok(()) +} + +pub async fn events + Clone>( + client: &Client, + pid: Pubkey, +) -> Result<()> { + let program = client.program(pid)?; + + let (sender, mut receiver) = mpsc::unbounded_channel(); + let event_unsubscriber = program + .on(move |_, event: MyEvent| { + if sender.send(event).is_err() { + println!("Error while transferring the event.") + } + }) + .await?; + + sleep(Duration::from_millis(1000)).await; + + program + .request() + .args(events_instruction::Initialize {}) + .send() + .await?; + + let event = receiver.recv().await.unwrap(); + assert_eq!(event.data, 5); + assert_eq!(event.label, "hello".to_string()); + + event_unsubscriber.unsubscribe().await; + + println!("Events success!"); + + Ok(()) +} + +pub async fn basic_4 + Clone>( + client: &Client, + pid: Pubkey, +) -> Result<()> { + let program = client.program(pid)?; + let authority = program.payer(); + let (counter, _) = Pubkey::find_program_address(&[b"counter"], &pid); + + program + .request() + .accounts(basic_4_accounts::Initialize { + counter, + authority, + system_program: system_program::ID, + }) + .args(basic_4_instruction::Initialize {}) + .send() + .await?; + let counter_account: CounterAccount = program.account(counter).await?; + assert_eq!(counter_account.authority, authority); + assert_eq!(counter_account.count, 0); + + program + .request() + .accounts(basic_4_accounts::Increment { counter, authority }) + .args(basic_4_instruction::Increment {}) + .send() + .await?; + + let counter_account: CounterAccount = program.account(counter).await?; + assert_eq!(counter_account.authority, authority); + assert_eq!(counter_account.count, 1); + + println!("Basic 4 success!"); + + Ok(()) +} + +pub async fn optional + Clone>( + client: &Client, + pid: Pubkey, +) -> Result<()> { + // Program client. + let program = client.program(pid)?; + + // `Initialize` parameters. + let data_account_keypair = Keypair::new(); + + let data_account_key = data_account_keypair.pubkey(); + + let data_pda_seeds = &[DataPda::PREFIX.as_ref(), data_account_key.as_ref()]; + let data_pda_key = Pubkey::find_program_address(data_pda_seeds, &pid).0; + let required_keypair = Keypair::new(); + let value: u64 = 10; + + // Build and send a transaction. + + program + .request() + .instruction(system_instruction::create_account( + &program.payer(), + &required_keypair.pubkey(), + program + .async_rpc() + .get_minimum_balance_for_rent_exemption(DataAccount::LEN) + .await?, + DataAccount::LEN as u64, + &program.id(), + )) + .signer(&data_account_keypair) + .signer(&required_keypair) + .accounts(OptionalInitialize { + payer: Some(program.payer()), + required: required_keypair.pubkey(), + system_program: Some(system_program::id()), + optional_account: Some(data_account_keypair.pubkey()), + optional_pda: None, + }) + .args(optional_instruction::Initialize { value, key: pid }) + .send() + .await + .unwrap(); + + // Assert the transaction worked. + let required: DataAccount = program.account(required_keypair.pubkey()).await?; + assert_eq!(required.data, 0); + + let optional_pda = program.account::(data_pda_key).await; + assert!(optional_pda.is_err()); + + let optional_account: DataAccount = program.account(data_account_keypair.pubkey()).await?; + assert_eq!(optional_account.data, value * 2); + + println!("Optional success!"); + + Ok(()) +} diff --git a/client/src/blocking.rs b/client/src/blocking.rs new file mode 100644 index 0000000000..492729d630 --- /dev/null +++ b/client/src/blocking.rs @@ -0,0 +1,109 @@ +use crate::{ + ClientError, Config, EventContext, EventUnsubscriber, Program, ProgramAccountsIterator, + RequestBuilder, +}; +use anchor_lang::{prelude::Pubkey, AccountDeserialize, Discriminator}; +use solana_client::{rpc_config::RpcSendTransactionConfig, rpc_filter::RpcFilterType}; +use solana_sdk::{ + commitment_config::CommitmentConfig, signature::Signature, signer::Signer, + transaction::Transaction, +}; +use std::{marker::PhantomData, ops::Deref, sync::Arc}; +use tokio::{ + runtime::{Builder, Handle}, + sync::RwLock, +}; + +impl<'a> EventUnsubscriber<'a> { + /// Unsubscribe gracefully. + pub fn unsubscribe(self) { + self.runtime_handle.block_on(self.unsubscribe_internal()) + } +} + +impl + Clone> Program { + pub fn new(program_id: Pubkey, cfg: Config) -> Result { + let rt: tokio::runtime::Runtime = Builder::new_multi_thread().enable_all().build()?; + + Ok(Self { + program_id, + cfg, + sub_client: Arc::new(RwLock::new(None)), + rt, + }) + } + + /// Returns the account at the given address. + pub fn account(&self, address: Pubkey) -> Result { + self.rt.block_on(self.account_internal(address)) + } + + /// Returns all program accounts of the given type matching the given filters + pub fn accounts( + &self, + filters: Vec, + ) -> Result, ClientError> { + self.accounts_lazy(filters)?.collect() + } + + /// Returns all program accounts of the given type matching the given filters as an iterator + /// Deserialization is executed lazily + pub fn accounts_lazy( + &self, + filters: Vec, + ) -> Result, ClientError> { + self.rt.block_on(self.accounts_lazy_internal(filters)) + } + + pub fn on( + &self, + f: impl Fn(&EventContext, T) + Send + 'static, + ) -> Result { + let (handle, rx) = self.rt.block_on(self.on_internal(f))?; + + Ok(EventUnsubscriber { + handle, + rx, + runtime_handle: self.rt.handle(), + _lifetime_marker: PhantomData, + }) + } +} + +impl<'a, C: Deref + Clone> RequestBuilder<'a, C> { + pub fn from( + program_id: Pubkey, + cluster: &str, + payer: C, + options: Option, + handle: &'a Handle, + ) -> Self { + Self { + program_id, + payer, + cluster: cluster.to_string(), + accounts: Vec::new(), + options: options.unwrap_or_default(), + instructions: Vec::new(), + instruction_data: None, + signers: Vec::new(), + handle, + } + } + + pub fn signed_transaction(&self) -> Result { + self.handle.block_on(self.signed_transaction_internal()) + } + + pub fn send(&self) -> Result { + self.handle.block_on(self.send_internal()) + } + + pub fn send_with_spinner_and_config( + &self, + config: RpcSendTransactionConfig, + ) -> Result { + self.handle + .block_on(self.send_with_spinner_and_config_internal(config)) + } +} diff --git a/client/src/lib.rs b/client/src/lib.rs index 477914a6a1..c25624a11c 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -6,27 +6,42 @@ use anchor_lang::solana_program::instruction::{AccountMeta, Instruction}; use anchor_lang::solana_program::program_error::ProgramError; use anchor_lang::solana_program::pubkey::Pubkey; use anchor_lang::{AccountDeserialize, Discriminator, InstructionData, ToAccountMetas}; +use futures::{Future, StreamExt}; use regex::Regex; use solana_account_decoder::UiAccountEncoding; -use solana_client::client_error::ClientError as SolanaClientError; -use solana_client::nonblocking::rpc_client::RpcClient as AsyncRpcClient; -use solana_client::pubsub_client::{PubsubClient, PubsubClientError, PubsubClientSubscription}; -use solana_client::rpc_client::RpcClient; use solana_client::rpc_config::{ RpcAccountInfoConfig, RpcProgramAccountsConfig, RpcSendTransactionConfig, RpcTransactionLogsConfig, RpcTransactionLogsFilter, }; use solana_client::rpc_filter::{Memcmp, RpcFilterType}; -use solana_client::rpc_response::{Response as RpcResponse, RpcLogsResponse}; +use solana_client::{ + client_error::ClientError as SolanaClientError, + nonblocking::{ + pubsub_client::{PubsubClient, PubsubClientError}, + rpc_client::RpcClient as AsyncRpcClient, + }, + rpc_client::RpcClient, + rpc_response::{Response as RpcResponse, RpcLogsResponse}, +}; use solana_sdk::account::Account; use solana_sdk::commitment_config::CommitmentConfig; use solana_sdk::signature::{Signature, Signer}; use solana_sdk::transaction::Transaction; -use std::convert::Into; use std::iter::Map; +use std::marker::PhantomData; use std::ops::Deref; +use std::pin::Pin; +use std::sync::Arc; use std::vec::IntoIter; use thiserror::Error; +use tokio::{ + runtime::Handle, + sync::{ + mpsc::{unbounded_channel, UnboundedReceiver}, + RwLock, + }, + task::JoinHandle, +}; pub use anchor_lang; pub use cluster::Cluster; @@ -35,12 +50,15 @@ pub use solana_sdk; mod cluster; +#[cfg(not(feature = "async"))] +mod blocking; +#[cfg(feature = "async")] +mod nonblocking; + const PROGRAM_LOG: &str = "Program log: "; const PROGRAM_DATA: &str = "Program data: "; -/// EventHandle unsubscribes from a program event stream on drop. -pub type EventHandle = PubsubClientSubscription>; - +type UnsubscribeFn = Box Pin + Send>> + Send>; /// Client defines the base configuration for building RPC clients to /// communicate with Anchor programs running on a Solana cluster. It's /// primary use is to build a `Program` client via the `program` method. @@ -69,31 +87,50 @@ impl> Client { } } - pub fn program(&self, program_id: Pubkey) -> Program { - Program { - program_id, - cfg: Config { - cluster: self.cfg.cluster.clone(), - options: self.cfg.options, - payer: self.cfg.payer.clone(), - }, - } + pub fn program(&self, program_id: Pubkey) -> Result, ClientError> { + let cfg = Config { + cluster: self.cfg.cluster.clone(), + options: self.cfg.options, + payer: self.cfg.payer.clone(), + }; + + Program::new(program_id, cfg) } } // Internal configuration for a client. #[derive(Debug)] -struct Config { +pub struct Config { cluster: Cluster, payer: C, options: Option, } +pub struct EventUnsubscriber<'a> { + handle: JoinHandle>, + rx: UnboundedReceiver, + #[cfg(not(feature = "async"))] + runtime_handle: &'a Handle, + _lifetime_marker: PhantomData<&'a Handle>, +} + +impl<'a> EventUnsubscriber<'a> { + async fn unsubscribe_internal(mut self) { + if let Some(unsubscribe) = self.rx.recv().await { + unsubscribe().await; + } + + let _ = self.handle.await; + } +} + /// Program is the primary client handle to be used to build and send requests. -#[derive(Debug)] pub struct Program { program_id: Pubkey, cfg: Config, + sub_client: Arc>>, + #[cfg(not(feature = "async"))] + rt: tokio::runtime::Runtime, } impl + Clone> Program { @@ -108,34 +145,47 @@ impl + Clone> Program { self.cfg.cluster.url(), self.cfg.payer.clone(), self.cfg.options, + #[cfg(not(feature = "async"))] + self.rt.handle(), + ) + } + + pub fn id(&self) -> Pubkey { + self.program_id + } + + pub fn rpc(&self) -> RpcClient { + RpcClient::new_with_commitment( + self.cfg.cluster.url().to_string(), + self.cfg.options.unwrap_or_default(), ) } - /// Returns the account at the given address. - pub fn account(&self, address: Pubkey) -> Result { - let rpc_client = RpcClient::new_with_commitment( + pub fn async_rpc(&self) -> AsyncRpcClient { + AsyncRpcClient::new_with_commitment( + self.cfg.cluster.url().to_string(), + self.cfg.options.unwrap_or_default(), + ) + } + + async fn account_internal( + &self, + address: Pubkey, + ) -> Result { + let rpc_client = AsyncRpcClient::new_with_commitment( self.cfg.cluster.url().to_string(), self.cfg.options.unwrap_or_default(), ); let account = rpc_client - .get_account_with_commitment(&address, CommitmentConfig::processed())? + .get_account_with_commitment(&address, CommitmentConfig::processed()) + .await? .value .ok_or(ClientError::AccountNotFound)?; let mut data: &[u8] = &account.data; T::try_deserialize(&mut data).map_err(Into::into) } - /// Returns all program accounts of the given type matching the given filters - pub fn accounts( - &self, - filters: Vec, - ) -> Result, ClientError> { - self.accounts_lazy(filters)?.collect() - } - - /// Returns all program accounts of the given type matching the given filters as an iterator - /// Deserialization is executed lazily - pub fn accounts_lazy( + async fn accounts_lazy_internal( &self, filters: Vec, ) -> Result, ClientError> { @@ -151,8 +201,9 @@ impl + Clone> Program { }; Ok(ProgramAccountsIterator { inner: self - .rpc() - .get_program_accounts_with_config(&self.id(), config)? + .async_rpc() + .get_program_accounts_with_config(&self.id(), config) + .await? .into_iter() .map(|(key, account)| { Ok((key, T::try_deserialize(&mut (&account.data as &[u8]))?)) @@ -160,86 +211,64 @@ impl + Clone> Program { }) } - pub fn rpc(&self) -> RpcClient { - RpcClient::new_with_commitment( - self.cfg.cluster.url().to_string(), - self.cfg.options.unwrap_or_default(), - ) - } + async fn init_sub_client_if_needed(&self) -> Result<(), ClientError> { + let lock = &self.sub_client; + let mut client = lock.write().await; - pub fn async_rpc(&self) -> AsyncRpcClient { - AsyncRpcClient::new_with_commitment( - self.cfg.cluster.url().to_string(), - self.cfg.options.unwrap_or_default(), - ) - } + if client.is_none() { + let sub_client = PubsubClient::new(self.cfg.cluster.ws_url()).await?; + *client = Some(sub_client); + } - pub fn id(&self) -> Pubkey { - self.program_id + Ok(()) } - pub fn on( + async fn on_internal( &self, f: impl Fn(&EventContext, T) + Send + 'static, - ) -> Result { - let addresses = vec![self.program_id.to_string()]; - let filter = RpcTransactionLogsFilter::Mentions(addresses); - let ws_url = self.cfg.cluster.ws_url().to_string(); - let cfg = RpcTransactionLogsConfig { + ) -> Result< + ( + JoinHandle>, + UnboundedReceiver, + ), + ClientError, + > { + self.init_sub_client_if_needed().await?; + let (tx, rx) = unbounded_channel::<_>(); + let config = RpcTransactionLogsConfig { commitment: self.cfg.options, }; - let self_program_str = self.program_id.to_string(); - let (client, receiver) = PubsubClient::logs_subscribe(&ws_url, filter, cfg)?; - std::thread::spawn(move || { - loop { - match receiver.recv() { - Ok(logs) => { - let ctx = EventContext { - signature: logs.value.signature.parse().unwrap(), - slot: logs.context.slot, - }; - let mut logs = &logs.value.logs[..]; - if !logs.is_empty() { - if let Ok(mut execution) = Execution::new(&mut logs) { - for l in logs { - // Parse the log. - let (event, new_program, did_pop) = { - if self_program_str == execution.program() { - handle_program_log(&self_program_str, l).unwrap_or_else( - |e| { - println!("Unable to parse log: {e}"); - std::process::exit(1); - }, - ) - } else { - let (program, did_pop) = - handle_system_log(&self_program_str, l); - (None, program, did_pop) - } - }; - // Emit the event. - if let Some(e) = event { - f(&ctx, e); - } - // Switch program context on CPI. - if let Some(new_program) = new_program { - execution.push(new_program); - } - // Program returned. - if did_pop { - execution.pop(); - } - } - } - } - } - Err(_err) => { - return; + let program_id_str = self.program_id.to_string(); + let filter = RpcTransactionLogsFilter::Mentions(vec![program_id_str.clone()]); + + let lock = Arc::clone(&self.sub_client); + + let handle = tokio::spawn(async move { + if let Some(ref client) = *lock.read().await { + let (mut notifications, unsubscribe) = + client.logs_subscribe(filter, config).await?; + + tx.send(unsubscribe).map_err(|e| { + ClientError::SolanaClientPubsubError(PubsubClientError::UnexpectedMessageError( + e.to_string(), + )) + })?; + + while let Some(logs) = notifications.next().await { + let ctx = EventContext { + signature: logs.value.signature.parse().unwrap(), + slot: logs.context.slot, + }; + let events = parse_logs_response(logs, &program_id_str); + for e in events { + f(&ctx, e); } } } + Ok::<(), ClientError>(()) }); - Ok(client) + + Ok((handle, rx)) } } @@ -373,6 +402,8 @@ pub enum ClientError { SolanaClientPubsubError(#[from] PubsubClientError), #[error("Unable to parse log: {0}")] LogParseError(String), + #[error(transparent)] + IOError(#[from] std::io::Error), } /// `RequestBuilder` provides a builder interface to create and send @@ -387,27 +418,11 @@ pub struct RequestBuilder<'a, C> { // Serialized instruction data for the target RPC. instruction_data: Option>, signers: Vec<&'a dyn Signer>, + #[cfg(not(feature = "async"))] + handle: &'a Handle, } impl<'a, C: Deref + Clone> RequestBuilder<'a, C> { - pub fn from( - program_id: Pubkey, - cluster: &str, - payer: C, - options: Option, - ) -> Self { - Self { - program_id, - payer, - cluster: cluster.to_string(), - accounts: Vec::new(), - options: options.unwrap_or_default(), - instructions: Vec::new(), - instruction_data: None, - signers: Vec::new(), - } - } - #[must_use] pub fn payer(mut self, payer: C) -> Self { self.payer = payer; @@ -488,36 +503,39 @@ impl<'a, C: Deref + Clone> RequestBuilder<'a, C> { Ok(tx) } - pub fn signed_transaction(&self) -> Result { - let latest_hash = - RpcClient::new_with_commitment(&self.cluster, self.options).get_latest_blockhash()?; - let tx = self.signed_transaction_with_blockhash(latest_hash)?; - - Ok(tx) - } - pub fn transaction(&self) -> Result { let instructions = &self.instructions; let tx = Transaction::new_with_payer(instructions, Some(&self.payer.pubkey())); Ok(tx) } - pub fn send(self) -> Result { - let rpc_client = RpcClient::new_with_commitment(&self.cluster, self.options); - let latest_hash = rpc_client.get_latest_blockhash()?; + async fn signed_transaction_internal(&self) -> Result { + let latest_hash = + AsyncRpcClient::new_with_commitment(self.cluster.to_owned(), self.options) + .get_latest_blockhash() + .await?; + let tx = self.signed_transaction_with_blockhash(latest_hash)?; + + Ok(tx) + } + + async fn send_internal(&self) -> Result { + let rpc_client = AsyncRpcClient::new_with_commitment(self.cluster.to_owned(), self.options); + let latest_hash = rpc_client.get_latest_blockhash().await?; let tx = self.signed_transaction_with_blockhash(latest_hash)?; rpc_client .send_and_confirm_transaction(&tx) + .await .map_err(Into::into) } - pub fn send_with_spinner_and_config( - self, + async fn send_with_spinner_and_config_internal( + &self, config: RpcSendTransactionConfig, ) -> Result { - let rpc_client = RpcClient::new_with_commitment(&self.cluster, self.options); - let latest_hash = rpc_client.get_latest_blockhash()?; + let rpc_client = AsyncRpcClient::new_with_commitment(self.cluster.to_owned(), self.options); + let latest_hash = rpc_client.get_latest_blockhash().await?; let tx = self.signed_transaction_with_blockhash(latest_hash)?; rpc_client @@ -526,10 +544,50 @@ impl<'a, C: Deref + Clone> RequestBuilder<'a, C> { rpc_client.commitment(), config, ) + .await .map_err(Into::into) } } +fn parse_logs_response( + logs: RpcResponse, + program_id_str: &str, +) -> Vec { + let mut logs = &logs.value.logs[..]; + let mut events: Vec = Vec::new(); + if !logs.is_empty() { + if let Ok(mut execution) = Execution::new(&mut logs) { + for l in logs { + // Parse the log. + let (event, new_program, did_pop) = { + if program_id_str == execution.program() { + handle_program_log(program_id_str, l).unwrap_or_else(|e| { + println!("Unable to parse log: {e}"); + std::process::exit(1); + }) + } else { + let (program, did_pop) = handle_system_log(program_id_str, l); + (None, program, did_pop) + } + }; + // Emit the event. + if let Some(e) = event { + events.push(e); + } + // Switch program context on CPI. + if let Some(new_program) = new_program { + execution.push(new_program); + } + // Program returned. + if did_pop { + execution.pop(); + } + } + } + } + events +} + #[cfg(test)] mod tests { use super::*; diff --git a/client/src/nonblocking.rs b/client/src/nonblocking.rs new file mode 100644 index 0000000000..93f06c114b --- /dev/null +++ b/client/src/nonblocking.rs @@ -0,0 +1,102 @@ +use crate::{ + ClientError, Config, EventContext, EventUnsubscriber, Program, ProgramAccountsIterator, + RequestBuilder, +}; +use anchor_lang::{prelude::Pubkey, AccountDeserialize, Discriminator}; +use solana_client::{rpc_config::RpcSendTransactionConfig, rpc_filter::RpcFilterType}; +use solana_sdk::{ + commitment_config::CommitmentConfig, signature::Signature, signer::Signer, + transaction::Transaction, +}; +use std::{marker::PhantomData, ops::Deref, sync::Arc}; +use tokio::sync::RwLock; + +impl<'a> EventUnsubscriber<'a> { + /// Unsubscribe gracefully. + pub async fn unsubscribe(self) { + self.unsubscribe_internal().await + } +} + +impl + Clone> Program { + pub fn new(program_id: Pubkey, cfg: Config) -> Result { + Ok(Self { + program_id, + cfg, + sub_client: Arc::new(RwLock::new(None)), + }) + } + + /// Returns the account at the given address. + pub async fn account(&self, address: Pubkey) -> Result { + self.account_internal(address).await + } + + /// Returns all program accounts of the given type matching the given filters + pub async fn accounts( + &self, + filters: Vec, + ) -> Result, ClientError> { + self.accounts_lazy(filters).await?.collect() + } + + /// Returns all program accounts of the given type matching the given filters as an iterator + /// Deserialization is executed lazily + pub async fn accounts_lazy( + &self, + filters: Vec, + ) -> Result, ClientError> { + self.accounts_lazy_internal(filters).await + } + + /// Subscribe to program logs. + /// + /// Returns an [`EventUnsubscriber`] to unsubscribe and close connection gracefully. + pub async fn on( + &self, + f: impl Fn(&EventContext, T) + Send + 'static, + ) -> Result { + let (handle, rx) = self.on_internal(f).await?; + + Ok(EventUnsubscriber { + handle, + rx, + _lifetime_marker: PhantomData, + }) + } +} + +impl<'a, C: Deref + Clone> RequestBuilder<'a, C> { + pub fn from( + program_id: Pubkey, + cluster: &str, + payer: C, + options: Option, + ) -> Self { + Self { + program_id, + payer, + cluster: cluster.to_string(), + accounts: Vec::new(), + options: options.unwrap_or_default(), + instructions: Vec::new(), + instruction_data: None, + signers: Vec::new(), + } + } + + pub async fn signed_transaction(&self) -> Result { + self.signed_transaction_internal().await + } + + pub async fn send(self) -> Result { + self.send_internal().await + } + + pub async fn send_with_spinner_and_config( + self, + config: RpcSendTransactionConfig, + ) -> Result { + self.send_with_spinner_and_config_internal(config).await + } +} diff --git a/tests/zero-copy/programs/zero-copy/Cargo.toml b/tests/zero-copy/programs/zero-copy/Cargo.toml index 21c5450906..e0d8664cf8 100644 --- a/tests/zero-copy/programs/zero-copy/Cargo.toml +++ b/tests/zero-copy/programs/zero-copy/Cargo.toml @@ -21,5 +21,5 @@ anchor-lang = { path = "../../../../lang" } bytemuck = {version = "1.4.0", features = ["derive", "min_const_generics"]} [dev-dependencies] -anchor-client = { path = "../../../../client", features = ["debug"] } +anchor-client = { path = "../../../../client", features = ["debug", "async"] } solana-program-test = "<1.17.0" diff --git a/tests/zero-copy/programs/zero-copy/tests/compute_unit_test.rs b/tests/zero-copy/programs/zero-copy/tests/compute_unit_test.rs index deada7d792..5fa1a1d257 100644 --- a/tests/zero-copy/programs/zero-copy/tests/compute_unit_test.rs +++ b/tests/zero-copy/programs/zero-copy/tests/compute_unit_test.rs @@ -46,7 +46,7 @@ async fn update_foo() { Rc::new(Keypair::new()), CommitmentConfig::processed(), ); - let program = client.program(zero_copy::id()); + let program = client.program(zero_copy::id()).unwrap(); let update_ix = program .request() .accounts(zero_copy::accounts::UpdateFoo {