From 50907f36edb8baf462a8670073f07783d5eba24e Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Mon, 21 Aug 2023 16:56:40 +0200 Subject: [PATCH 1/6] cleanup/consolidate tests - remove redundant integration test (literpc_tpu_quic_server_integrationtest.rs) and use the quic_proxy_tpu_integrationtest.rs for everything - remove some commands from test.bash --- Cargo.toml | 2 +- .../Cargo.toml | 2 +- ...d_transactions_quic_tpu_integrationtest.rs | 0 quic-forward-proxy/src/lib.rs | 2 +- ...literpc_tpu_quic_server_integrationtest.rs | 488 ------------------ test.bash | 8 +- 6 files changed, 5 insertions(+), 497 deletions(-) rename {quic-forward-proxy-integration-test => integration-test}/Cargo.toml (96%) rename quic-forward-proxy-integration-test/tests/quic_proxy_tpu_integrationtest.rs => integration-test/tests/send_transactions_quic_tpu_integrationtest.rs (100%) delete mode 100644 services/tests/literpc_tpu_quic_server_integrationtest.rs diff --git a/Cargo.toml b/Cargo.toml index ae137d83..7e986afd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,7 +4,7 @@ members = [ "services", "lite-rpc", "quic-forward-proxy", - "quic-forward-proxy-integration-test", + "integration-test", "bench" ] diff --git a/quic-forward-proxy-integration-test/Cargo.toml b/integration-test/Cargo.toml similarity index 96% rename from quic-forward-proxy-integration-test/Cargo.toml rename to integration-test/Cargo.toml index bc5618fa..40732095 100644 --- a/quic-forward-proxy-integration-test/Cargo.toml +++ b/integration-test/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "solana-lite-rpc-quic-forward-proxy-integration-test" +name = "integration-test" version = "0.1.0" edition = "2021" description = "Integration test for quic proxy " diff --git a/quic-forward-proxy-integration-test/tests/quic_proxy_tpu_integrationtest.rs b/integration-test/tests/send_transactions_quic_tpu_integrationtest.rs similarity index 100% rename from quic-forward-proxy-integration-test/tests/quic_proxy_tpu_integrationtest.rs rename to integration-test/tests/send_transactions_quic_tpu_integrationtest.rs diff --git a/quic-forward-proxy/src/lib.rs b/quic-forward-proxy/src/lib.rs index ffc51a52..212adcdb 100644 --- a/quic-forward-proxy/src/lib.rs +++ b/quic-forward-proxy/src/lib.rs @@ -1,4 +1,4 @@ -// lib definition is only required for 'quic-forward-proxy-integration-test' to work +// lib definition is only required for 'integration-test' to work mod cli; mod inbound; diff --git a/services/tests/literpc_tpu_quic_server_integrationtest.rs b/services/tests/literpc_tpu_quic_server_integrationtest.rs deleted file mode 100644 index e1a92bac..00000000 --- a/services/tests/literpc_tpu_quic_server_integrationtest.rs +++ /dev/null @@ -1,488 +0,0 @@ -use countmap::CountMap; -use crossbeam_channel::Sender; - -use log::{debug, error, info, trace, warn}; - -use solana_lite_rpc_core::quic_connection_utils::QuicConnectionParameters; -use solana_lite_rpc_core::structures::identity_stakes::IdentityStakes; -use solana_lite_rpc_core::tx_store::empty_tx_store; -use solana_lite_rpc_services::tpu_utils::tpu_connection_manager::TpuConnectionManager; -use solana_rpc_client::rpc_client::SerializableTransaction; -use solana_sdk::hash::Hash; -use solana_sdk::instruction::Instruction; -use solana_sdk::message::Message; -use solana_sdk::pubkey::Pubkey; -use solana_sdk::signature::{Keypair, Signature, Signer}; - -use solana_sdk::transaction::{Transaction, VersionedTransaction}; -use solana_streamer::nonblocking::quic::ConnectionPeerType; -use solana_streamer::packet::PacketBatch; -use solana_streamer::quic::StreamStats; -use solana_streamer::streamer::StakedNodes; -use solana_streamer::tls_certificates::new_self_signed_tls_certificate; -use std::collections::HashMap; -use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}; - -use std::str::FromStr; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::{Arc, RwLock}; -use std::thread; -use std::time::{Duration, Instant}; -use tokio::runtime::Builder; -use tokio::task::JoinHandle; -use tokio::time::sleep; -use tracing_subscriber::fmt::format::FmtSpan; -use tracing_subscriber::EnvFilter; - -#[derive(Copy, Clone, Debug)] -struct TestCaseParams { - sample_tx_count: u32, - stake_connection: bool, -} - -const MAXIMUM_TRANSACTIONS_IN_QUEUE: usize = 200_000; -const MAX_QUIC_CONNECTIONS_PER_PEER: usize = 8; // like solana repo - -const QUIC_CONNECTION_PARAMS: QuicConnectionParameters = QuicConnectionParameters { - connection_timeout: Duration::from_secs(2), - connection_retry_count: 10, - finalize_timeout: Duration::from_secs(2), - max_number_of_connections: 8, - unistream_timeout: Duration::from_secs(2), - write_timeout: Duration::from_secs(2), - number_of_transactions_per_unistream: 10, -}; - -#[test] -pub fn small_tx_batch_staked() { - configure_logging(true); - - wireup_and_send_txs_via_channel(TestCaseParams { - sample_tx_count: 20, - stake_connection: true, - }); -} - -#[test] -pub fn small_tx_batch_unstaked() { - configure_logging(true); - - wireup_and_send_txs_via_channel(TestCaseParams { - sample_tx_count: 20, - stake_connection: false, - }); -} - -// note: this tests are flakes on CI ond also local (see https://mangolana.atlassian.net/browse/MAN-59) -#[ignore] -#[test] -pub fn many_transactions() { - configure_logging(false); - - wireup_and_send_txs_via_channel(TestCaseParams { - sample_tx_count: 10000, - stake_connection: true, - }); -} - -// note: this tests are flakes on CI ond also local (see https://mangolana.atlassian.net/browse/MAN-59) -#[ignore] -#[test] -pub fn too_many_transactions() { - configure_logging(false); - - wireup_and_send_txs_via_channel(TestCaseParams { - sample_tx_count: 100000, - stake_connection: false, - }); -} - -// note: this not a tokio test as runtimes get created as part of the integration test -fn wireup_and_send_txs_via_channel(test_case_params: TestCaseParams) { - configure_panic_hook(); - // value from solana - see quic streamer - see quic.rs -> rt() - const NUM_QUIC_STREAMER_WORKER_THREADS: usize = 1; - let runtime_quic1 = Builder::new_multi_thread() - .worker_threads(NUM_QUIC_STREAMER_WORKER_THREADS) - .thread_name("quic-server") - .enable_all() - .build() - .expect("failed to build tokio runtime for testing quic server"); - - // lite-rpc - let runtime_literpc = Builder::new_multi_thread() - // see lite-rpc -> main.rs - .worker_threads(16) // also works with 1 - .enable_all() - .build() - .expect("failed to build tokio runtime for lite-rpc-tpu-client"); - - let literpc_validator_identity = Arc::new(Keypair::new()); - let udp_listen_socket = UdpSocket::bind("127.0.0.1:0").unwrap(); - let listen_addr = udp_listen_socket.local_addr().unwrap(); - - let (inbound_packets_sender, inbound_packets_receiver) = crossbeam_channel::unbounded(); - - runtime_quic1.block_on(async { - // see log "Start quic server on UdpSocket { addr: 127.0.0.1:xxxxx, fd: 10 }" - let staked_nodes = StakedNodes { - total_stake: 100, - max_stake: 40, - min_stake: 0, - ip_stake_map: Default::default(), - pubkey_stake_map: if test_case_params.stake_connection { - let mut map = HashMap::new(); - map.insert(literpc_validator_identity.pubkey(), 30); - map - } else { - HashMap::default() - }, - }; - - let _solana_quic_streamer = SolanaQuicStreamer::new_start_listening( - udp_listen_socket, - inbound_packets_sender, - staked_nodes, - ); - }); - - runtime_literpc.block_on(async { - tokio::spawn(start_literpc_client( - test_case_params, - listen_addr, - literpc_validator_identity, - )); - }); - - let packet_consumer_jh = thread::spawn(move || { - info!("start pulling packets..."); - let mut packet_count = 0; - let time_to_first = Instant::now(); - let mut latest_tx = Instant::now(); - let timer = Instant::now(); - // second half - let mut timer2 = None; - let mut packet_count2 = 0; - let mut count_map: CountMap = - CountMap::with_capacity(test_case_params.sample_tx_count as usize); - let warmup_tx_count: u32 = test_case_params.sample_tx_count / 2; - while (count_map.len() as u32) < test_case_params.sample_tx_count { - if latest_tx.elapsed() > Duration::from_secs(50) { - warn!("abort after timeout waiting for packet from quic streamer"); - break; - } - - let packet_batch = match inbound_packets_receiver - .recv_timeout(Duration::from_millis(500)) - { - Ok(batch) => batch, - Err(_) => { - debug!("consumer thread did not receive packets on inbound channel recently - continue polling"); - continue; - } - }; - - // reset timer - latest_tx = Instant::now(); - - if packet_count == 0 { - info!( - "time to first packet {}ms", - time_to_first.elapsed().as_millis() - ); - } - - packet_count += packet_batch.len() as u32; - if timer2.is_some() { - packet_count2 += packet_batch.len() as u32; - } - - for packet in packet_batch.iter() { - let tx = packet - .deserialize_slice::(..) - .unwrap(); - trace!( - "read transaction from quic streaming server: {:?}", - tx.get_signature() - ); - count_map.insert_or_increment(*tx.get_signature()); - } - - if packet_count == warmup_tx_count { - timer2 = Some(Instant::now()); - } - if packet_count == test_case_params.sample_tx_count { - break; - } - } // -- while not all packets received - by count - - let total_duration = timer.elapsed(); - let half_duration = timer2 - .map(|t| t.elapsed()) - .unwrap_or(Duration::from_secs(3333)); - - // throughput_50 is second half of transactions - should iron out startup effects - info!( - "consumed {} packets in {}us - throughput {:.2} tps, throughput_50 {:.2} tps, ", - packet_count, - total_duration.as_micros(), - packet_count as f64 / total_duration.as_secs_f64(), - packet_count2 as f64 / half_duration.as_secs_f64(), - ); - - info!("got all expected packets - shutting down tokio runtime with lite-rpc client"); - - assert_eq!( - count_map.len() as u32, - test_case_params.sample_tx_count, - "count_map size should be equal to sample_tx_count" - ); - assert!( - count_map.values().all(|cnt| *cnt == 1), - "all transactions should be unique" - ); - - runtime_literpc.shutdown_timeout(Duration::from_millis(1000)); - }); - - packet_consumer_jh.join().unwrap(); -} - -fn configure_panic_hook() { - let default_panic = std::panic::take_hook(); - std::panic::set_hook(Box::new(move |panic_info| { - default_panic(panic_info); - if let Some(location) = panic_info.location() { - error!( - "panic occurred in file '{}' at line {}", - location.file(), - location.line(), - ); - } else { - error!("panic occurred but can't get location information..."); - } - // note: we do not exit the process to allow proper test execution - })); -} - -fn configure_logging(verbose: bool) { - let env_filter = if verbose { - "debug,rustls=info,quinn=info,quinn_proto=debug,solana_streamer=debug,solana_lite_rpc_quic_forward_proxy=trace" - } else { - "info,rustls=info,quinn=info,quinn_proto=info,solana_streamer=info,solana_lite_rpc_quic_forward_proxy=info" - }; - let span_mode = if verbose { - FmtSpan::CLOSE - } else { - FmtSpan::NONE - }; - // EnvFilter::try_from_default_env().unwrap_or(env_filter) - let filter = - EnvFilter::try_from_default_env().unwrap_or(EnvFilter::from_str(env_filter).unwrap()); - - let result = tracing_subscriber::fmt::fmt() - .with_env_filter(filter) - .with_span_events(span_mode) - .try_init(); - if result.is_err() { - println!("Logging already initialized - ignore"); - } -} - -async fn start_literpc_client( - test_case_params: TestCaseParams, - streamer_listen_addrs: SocketAddr, - literpc_validator_identity: Arc, -) -> anyhow::Result<()> { - info!("Start lite-rpc test client ..."); - - let fanout_slots = 4; - - // (String, Vec) (signature, transaction) - let (sender, _) = tokio::sync::broadcast::channel(MAXIMUM_TRANSACTIONS_IN_QUEUE); - let broadcast_sender = Arc::new(sender); - let (certificate, key) = new_self_signed_tls_certificate( - literpc_validator_identity.as_ref(), - IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), - ) - .expect("Failed to initialize QUIC connection certificates"); - - let tpu_connection_manager = - TpuConnectionManager::new(certificate, key, fanout_slots as usize).await; - - // this effectively controls how many connections we will have - let mut connections_to_keep: HashMap = HashMap::new(); - let addr1 = UdpSocket::bind("127.0.0.1:0") - .unwrap() - .local_addr() - .unwrap(); - connections_to_keep.insert( - Pubkey::from_str("1111111jepwNWbYG87sgwnBbUJnQHrPiUJzMpqJXZ")?, - addr1, - ); - - let addr2 = UdpSocket::bind("127.0.0.1:0") - .unwrap() - .local_addr() - .unwrap(); - connections_to_keep.insert( - Pubkey::from_str("1111111k4AYMctpyJakWNvGcte6tR8BLyZw54R8qu")?, - addr2, - ); - - // this is the real streamer - connections_to_keep.insert(literpc_validator_identity.pubkey(), streamer_listen_addrs); - - // get information about the optional validator identity stake - // populated from get_stakes_for_identity() - let identity_stakes = IdentityStakes { - peer_type: ConnectionPeerType::Staked, - stakes: if test_case_params.stake_connection { - 30 - } else { - 0 - }, // stake of lite-rpc - min_stakes: 0, - max_stakes: 40, - total_stakes: 100, - }; - - // solana_streamer::nonblocking::quic: Peer type: Staked, stake 30, total stake 0, max streams 128 receive_window Ok(12320) from peer 127.0.0.1:8000 - - tpu_connection_manager - .update_connections( - broadcast_sender.clone(), - connections_to_keep, - identity_stakes, - // note: tx_store is useless in this scenario as it is never changed; it's only used to check for duplicates - empty_tx_store().clone(), - QUIC_CONNECTION_PARAMS, - ) - .await; - - for i in 0..test_case_params.sample_tx_count { - let raw_sample_tx = build_raw_sample_tx(i); - broadcast_sender.send(raw_sample_tx)?; - } - - // we need that to keep the tokio runtime dedicated to lite-rpc up long enough - sleep(Duration::from_secs(30)).await; - - // reaching this point means there is problem with test setup and the consumer threed - panic!("should never reach this point") -} - -#[tokio::test] -// taken from solana -> test_nonblocking_quic_client_multiple_writes -async fn solana_quic_streamer_start() { - let (sender, _receiver) = crossbeam_channel::unbounded(); - let staked_nodes = Arc::new(RwLock::new(StakedNodes::default())); - // will create random free port - let sock = UdpSocket::bind("127.0.0.1:0").unwrap(); - let exit = Arc::new(AtomicBool::new(false)); - // keypair to derive the server tls certificate - let keypair = Keypair::new(); - // gossip_host is used in the server certificate - let gossip_host = "127.0.0.1".parse().unwrap(); - let stats = Arc::new(StreamStats::default()); - let (_, t) = solana_streamer::nonblocking::quic::spawn_server( - sock.try_clone().unwrap(), - &keypair, - gossip_host, - sender, - exit.clone(), - 1, - staked_nodes, - 10, - 10, - stats.clone(), - 1000, - ) - .unwrap(); - - let addr = sock.local_addr().unwrap().ip(); - let port = sock.local_addr().unwrap().port(); - let _tpu_addr = SocketAddr::new(addr, port); - - // sleep(Duration::from_millis(500)).await; - - exit.store(true, Ordering::Relaxed); - t.await.unwrap(); - - stats.report(); -} - -struct SolanaQuicStreamer { - _sock: UdpSocket, - _exit: Arc, - _join_handler: JoinHandle<()>, - _stats: Arc, -} - -impl SolanaQuicStreamer { - fn new_start_listening( - udp_socket: UdpSocket, - sender: Sender, - staked_nodes: StakedNodes, - ) -> Self { - let staked_nodes = Arc::new(RwLock::new(staked_nodes)); - let exit = Arc::new(AtomicBool::new(false)); - // keypair to derive the server tls certificate - let keypair = Keypair::new(); - // gossip_host is used in the server certificate - let gossip_host = "127.0.0.1".parse().unwrap(); - let stats = Arc::new(StreamStats::default()); - let (_, jh) = solana_streamer::nonblocking::quic::spawn_server( - udp_socket.try_clone().unwrap(), - &keypair, - gossip_host, - sender, - exit.clone(), - MAX_QUIC_CONNECTIONS_PER_PEER, - staked_nodes, - 10, - 10, - stats.clone(), - 1000, - ) - .unwrap(); - - let addr = udp_socket.local_addr().unwrap().ip(); - let port = udp_socket.local_addr().unwrap().port(); - let _tpu_addr = SocketAddr::new(addr, port); - - Self { - _sock: udp_socket, - _exit: exit, - _join_handler: jh, - _stats: stats, - } - } -} - -const MEMO_PROGRAM_ID: &str = "MemoSq4gqABAXKb96qnH8TysNcWxMyWCqXgDLGmfcHr"; - -pub fn build_raw_sample_tx(i: u32) -> (String, Vec) { - let payer_keypair = Keypair::from_base58_string( - "rKiJ7H5UUp3JR18kNyTF1XPuwPKHEM7gMLWHZPWP5djrW1vSjfwjhvJrevxF9MPmUmN9gJMLHZdLMgc9ao78eKr", - ); - - let tx = build_sample_tx(&payer_keypair, i); - - let raw_tx = bincode::serialize::(&tx).expect("failed to serialize tx"); - - (tx.get_signature().to_string(), raw_tx) -} - -fn build_sample_tx(payer_keypair: &Keypair, i: u32) -> VersionedTransaction { - let blockhash = Hash::default(); - create_memo_tx(format!("hi {}", i).as_bytes(), payer_keypair, blockhash).into() -} - -fn create_memo_tx(msg: &[u8], payer: &Keypair, blockhash: Hash) -> Transaction { - let memo = Pubkey::from_str(MEMO_PROGRAM_ID).unwrap(); - - let instruction = Instruction::new_with_bytes(memo, msg, vec![]); - let message = Message::new(&[instruction], Some(&payer.pubkey())); - Transaction::new(&[payer], message, blockhash) -} diff --git a/test.bash b/test.bash index 33bf4222..ce1d3dbf 100755 --- a/test.bash +++ b/test.bash @@ -3,10 +3,6 @@ # kill background jobs on exit/failure trap 'kill $(jobs -pr)' SIGINT SIGTERM EXIT -echo "Doing an early build" -cargo build --workspace --tests -yarn - echo "Switching to local lite-rpc rpc config" solana config set --url "http://0.0.0.0:8899" @@ -19,8 +15,8 @@ sleep 20 && solana airdrop 10000 echo "Starting LiteRpc" cargo run & -echo "Running cargo tests in 20s" -sleep 20 && cargo test +echo "Waiting 20s for startup" +sleep 20 echo "Running yarn tests" yarn test From 311ac390e866bfd3ee531e17c75f46cffd198410 Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Mon, 21 Aug 2023 17:48:55 +0200 Subject: [PATCH 2/6] ignore more flakey quic tests --- .../tests/quic_proxy_tpu_integrationtest.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/quic-forward-proxy-integration-test/tests/quic_proxy_tpu_integrationtest.rs b/quic-forward-proxy-integration-test/tests/quic_proxy_tpu_integrationtest.rs index e52e513b..825936c4 100644 --- a/quic-forward-proxy-integration-test/tests/quic_proxy_tpu_integrationtest.rs +++ b/quic-forward-proxy-integration-test/tests/quic_proxy_tpu_integrationtest.rs @@ -144,6 +144,8 @@ pub fn with_10000_transactions_direct() { }); } +// note: this tests are flakes on CI ond also local (see https://mangolana.atlassian.net/browse/MAN-59) +#[ignore] #[test] pub fn with_10000_transactions_proxy() { configure_logging(false); @@ -155,6 +157,8 @@ pub fn with_10000_transactions_proxy() { }); } +// note: this tests are flakes on CI ond also local (see https://mangolana.atlassian.net/browse/MAN-59) +#[ignore] #[test] pub fn many_transactions_proxy() { configure_logging(false); From adc13bd67f2e5198b698f237c2f7e7f19a962961 Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Mon, 21 Aug 2023 18:10:34 +0200 Subject: [PATCH 3/6] tpu agends must handle connection shutdown explicitly --- quic-forward-proxy/src/outbound/tx_forward.rs | 4 ++- .../src/quinn_auto_reconnect.rs | 25 +++++++++++++++++++ 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/quic-forward-proxy/src/outbound/tx_forward.rs b/quic-forward-proxy/src/outbound/tx_forward.rs index d7224953..fb9f7a4f 100644 --- a/quic-forward-proxy/src/outbound/tx_forward.rs +++ b/quic-forward-proxy/src/outbound/tx_forward.rs @@ -181,8 +181,10 @@ pub async fn tx_forwarder( } } // -- while all packtes from channel + + auto_connection.force_shutdown().await; warn!( - "Quic forwarder agent #{} for TPU {} exited", + "Quic forwarder agent #{} for TPU {} exited; shut down connection", connection_idx, tpu_address ); }); // -- spawned thread for one connection to one TPU diff --git a/quic-forward-proxy/src/quinn_auto_reconnect.rs b/quic-forward-proxy/src/quinn_auto_reconnect.rs index 24ed35af..14c0120c 100644 --- a/quic-forward-proxy/src/quinn_auto_reconnect.rs +++ b/quic-forward-proxy/src/quinn_auto_reconnect.rs @@ -25,6 +25,7 @@ enum ConnectionState { Connection(Connection), PermanentError, FailedAttempt(u32), + ShuttingDown, } pub struct AutoReconnect { @@ -66,6 +67,7 @@ impl AutoReconnect { ConnectionState::Connection(conn) => Ok(conn.clone()), ConnectionState::PermanentError => bail!("permanent error"), ConnectionState::FailedAttempt(_) => bail!("failed connection attempt"), + ConnectionState::ShuttingDown => bail!("shutting down"), } } @@ -172,6 +174,13 @@ impl AutoReconnect { } }; } + ConnectionState::ShuttingDown => { + // no nothing + debug!( + "Not using connection to {} that's shutting down", + self.target_address + ); + } } } @@ -194,6 +203,21 @@ impl AutoReconnect { } } + /// close connection without any sophisticated handling; assumes that send buffers were flushed by .send etc. + pub async fn force_shutdown(&self) { + let mut lock = self.current.write().await; + match &*lock { + ConnectionState::Connection(conn) => { + conn.close(0u32.into(), b"client_shutdown"); + *lock = ConnectionState::ShuttingDown; + } + ConnectionState::NotConnected => *lock = ConnectionState::ShuttingDown, + ConnectionState::PermanentError => *lock = ConnectionState::ShuttingDown, + ConnectionState::FailedAttempt(_) => *lock = ConnectionState::ShuttingDown, + ConnectionState::ShuttingDown => *lock = ConnectionState::ShuttingDown, + } + } + // stable_id 140266619216912, rtt=2.156683ms, // stats FrameStats { ACK: 3, CONNECTION_CLOSE: 0, CRYPTO: 3, // DATA_BLOCKED: 0, DATAGRAM: 0, HANDSHAKE_DONE: 1, MAX_DATA: 0, @@ -213,6 +237,7 @@ impl AutoReconnect { ConnectionState::NotConnected => "n/c".to_string(), ConnectionState::PermanentError => "n/a (permanent)".to_string(), ConnectionState::FailedAttempt(_) => "fail".to_string(), + ConnectionState::ShuttingDown => "shutdown".to_string(), } } } From 3d2f9af8b8daacc9ef0c8fcf9e7a7d58a2a5e911 Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Thu, 24 Aug 2023 09:15:15 +0200 Subject: [PATCH 4/6] revert cargo test deduplication --- test.bash | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test.bash b/test.bash index ce1d3dbf..cc99c2b3 100755 --- a/test.bash +++ b/test.bash @@ -15,8 +15,8 @@ sleep 20 && solana airdrop 10000 echo "Starting LiteRpc" cargo run & -echo "Waiting 20s for startup" -sleep 20 +echo "Running cargo tests in 20s" +sleep 20 && cargo test echo "Running yarn tests" yarn test From 9263b11e8c85b02943f8094aedb38faaff95a988 Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Tue, 29 Aug 2023 01:13:08 +0200 Subject: [PATCH 5/6] code fmt --- core/src/block_processor.rs | 10 ++++++++-- lite-rpc/src/postgres.rs | 2 +- services/src/prometheus_sync.rs | 2 +- services/src/transaction_service.rs | 6 +++++- 4 files changed, 15 insertions(+), 5 deletions(-) diff --git a/core/src/block_processor.rs b/core/src/block_processor.rs index 86d80b36..b9239f0f 100644 --- a/core/src/block_processor.rs +++ b/core/src/block_processor.rs @@ -86,7 +86,7 @@ impl BlockProcessor { let Some(transactions) = block.transactions else { return Ok(BlockProcessorResult::invalid()); - }; + }; let blockhash = block.blockhash; let parent_slot = block.parent_slot; @@ -110,7 +110,13 @@ impl BlockProcessor { let mut transaction_infos = vec![]; transaction_infos.reserve(transactions.len()); for tx in transactions { - let Some(UiTransactionStatusMeta { err, status, compute_units_consumed ,.. }) = tx.meta else { + let Some(UiTransactionStatusMeta { + err, + status, + compute_units_consumed, + .. + }) = tx.meta + else { info!("tx with no meta"); continue; }; diff --git a/lite-rpc/src/postgres.rs b/lite-rpc/src/postgres.rs index dd920833..a6e84ff5 100644 --- a/lite-rpc/src/postgres.rs +++ b/lite-rpc/src/postgres.rs @@ -480,7 +480,7 @@ impl Postgres { let Ok(session) = session else { POSTGRES_SESSION_ERRORS.inc(); - const TIME_OUT:Duration = Duration::from_millis(1000); + const TIME_OUT: Duration = Duration::from_millis(1000); warn!("Unable to get postgres session. Retrying in {TIME_OUT:?}"); tokio::time::sleep(TIME_OUT).await; diff --git a/services/src/prometheus_sync.rs b/services/src/prometheus_sync.rs index 3a8a9973..fad8a378 100644 --- a/services/src/prometheus_sync.rs +++ b/services/src/prometheus_sync.rs @@ -44,7 +44,7 @@ impl PrometheusSync { let listener = TcpListener::bind(addr).await?; loop { - let Ok((mut stream, _addr)) = listener.accept().await else { + let Ok((mut stream, _addr)) = listener.accept().await else { error!("Error accepting prometheus stream"); tokio::time::sleep(Duration::from_millis(1)).await; continue; diff --git a/services/src/transaction_service.rs b/services/src/transaction_service.rs index 8cacf59e..f577f76b 100644 --- a/services/src/transaction_service.rs +++ b/services/src/transaction_service.rs @@ -155,7 +155,11 @@ impl TransactionService { }; let signature = tx.signatures[0]; - let Some(BlockInformation { slot, last_valid_blockheight, .. }) = self + let Some(BlockInformation { + slot, + last_valid_blockheight, + .. + }) = self .block_store .get_block_info(&tx.get_recent_blockhash().to_string()) else { From 1fb85729ad91e07f5e21b5f0f8336fbc4d4d3ae5 Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Tue, 29 Aug 2023 17:16:48 +0200 Subject: [PATCH 6/6] Revert "cleanup/consolidate tests" This reverts commit 50907f36edb8baf462a8670073f07783d5eba24e. --- Cargo.toml | 2 +- .../Cargo.toml | 2 +- .../tests/quic_proxy_tpu_integrationtest.rs | 0 quic-forward-proxy/src/lib.rs | 2 +- ...literpc_tpu_quic_server_integrationtest.rs | 488 ++++++++++++++++++ test.bash | 4 + 6 files changed, 495 insertions(+), 3 deletions(-) rename {integration-test => quic-forward-proxy-integration-test}/Cargo.toml (96%) rename integration-test/tests/send_transactions_quic_tpu_integrationtest.rs => quic-forward-proxy-integration-test/tests/quic_proxy_tpu_integrationtest.rs (100%) create mode 100644 services/tests/literpc_tpu_quic_server_integrationtest.rs diff --git a/Cargo.toml b/Cargo.toml index 7e986afd..ae137d83 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,7 +4,7 @@ members = [ "services", "lite-rpc", "quic-forward-proxy", - "integration-test", + "quic-forward-proxy-integration-test", "bench" ] diff --git a/integration-test/Cargo.toml b/quic-forward-proxy-integration-test/Cargo.toml similarity index 96% rename from integration-test/Cargo.toml rename to quic-forward-proxy-integration-test/Cargo.toml index 40732095..bc5618fa 100644 --- a/integration-test/Cargo.toml +++ b/quic-forward-proxy-integration-test/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "integration-test" +name = "solana-lite-rpc-quic-forward-proxy-integration-test" version = "0.1.0" edition = "2021" description = "Integration test for quic proxy " diff --git a/integration-test/tests/send_transactions_quic_tpu_integrationtest.rs b/quic-forward-proxy-integration-test/tests/quic_proxy_tpu_integrationtest.rs similarity index 100% rename from integration-test/tests/send_transactions_quic_tpu_integrationtest.rs rename to quic-forward-proxy-integration-test/tests/quic_proxy_tpu_integrationtest.rs diff --git a/quic-forward-proxy/src/lib.rs b/quic-forward-proxy/src/lib.rs index 212adcdb..ffc51a52 100644 --- a/quic-forward-proxy/src/lib.rs +++ b/quic-forward-proxy/src/lib.rs @@ -1,4 +1,4 @@ -// lib definition is only required for 'integration-test' to work +// lib definition is only required for 'quic-forward-proxy-integration-test' to work mod cli; mod inbound; diff --git a/services/tests/literpc_tpu_quic_server_integrationtest.rs b/services/tests/literpc_tpu_quic_server_integrationtest.rs new file mode 100644 index 00000000..e1a92bac --- /dev/null +++ b/services/tests/literpc_tpu_quic_server_integrationtest.rs @@ -0,0 +1,488 @@ +use countmap::CountMap; +use crossbeam_channel::Sender; + +use log::{debug, error, info, trace, warn}; + +use solana_lite_rpc_core::quic_connection_utils::QuicConnectionParameters; +use solana_lite_rpc_core::structures::identity_stakes::IdentityStakes; +use solana_lite_rpc_core::tx_store::empty_tx_store; +use solana_lite_rpc_services::tpu_utils::tpu_connection_manager::TpuConnectionManager; +use solana_rpc_client::rpc_client::SerializableTransaction; +use solana_sdk::hash::Hash; +use solana_sdk::instruction::Instruction; +use solana_sdk::message::Message; +use solana_sdk::pubkey::Pubkey; +use solana_sdk::signature::{Keypair, Signature, Signer}; + +use solana_sdk::transaction::{Transaction, VersionedTransaction}; +use solana_streamer::nonblocking::quic::ConnectionPeerType; +use solana_streamer::packet::PacketBatch; +use solana_streamer::quic::StreamStats; +use solana_streamer::streamer::StakedNodes; +use solana_streamer::tls_certificates::new_self_signed_tls_certificate; +use std::collections::HashMap; +use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}; + +use std::str::FromStr; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{Arc, RwLock}; +use std::thread; +use std::time::{Duration, Instant}; +use tokio::runtime::Builder; +use tokio::task::JoinHandle; +use tokio::time::sleep; +use tracing_subscriber::fmt::format::FmtSpan; +use tracing_subscriber::EnvFilter; + +#[derive(Copy, Clone, Debug)] +struct TestCaseParams { + sample_tx_count: u32, + stake_connection: bool, +} + +const MAXIMUM_TRANSACTIONS_IN_QUEUE: usize = 200_000; +const MAX_QUIC_CONNECTIONS_PER_PEER: usize = 8; // like solana repo + +const QUIC_CONNECTION_PARAMS: QuicConnectionParameters = QuicConnectionParameters { + connection_timeout: Duration::from_secs(2), + connection_retry_count: 10, + finalize_timeout: Duration::from_secs(2), + max_number_of_connections: 8, + unistream_timeout: Duration::from_secs(2), + write_timeout: Duration::from_secs(2), + number_of_transactions_per_unistream: 10, +}; + +#[test] +pub fn small_tx_batch_staked() { + configure_logging(true); + + wireup_and_send_txs_via_channel(TestCaseParams { + sample_tx_count: 20, + stake_connection: true, + }); +} + +#[test] +pub fn small_tx_batch_unstaked() { + configure_logging(true); + + wireup_and_send_txs_via_channel(TestCaseParams { + sample_tx_count: 20, + stake_connection: false, + }); +} + +// note: this tests are flakes on CI ond also local (see https://mangolana.atlassian.net/browse/MAN-59) +#[ignore] +#[test] +pub fn many_transactions() { + configure_logging(false); + + wireup_and_send_txs_via_channel(TestCaseParams { + sample_tx_count: 10000, + stake_connection: true, + }); +} + +// note: this tests are flakes on CI ond also local (see https://mangolana.atlassian.net/browse/MAN-59) +#[ignore] +#[test] +pub fn too_many_transactions() { + configure_logging(false); + + wireup_and_send_txs_via_channel(TestCaseParams { + sample_tx_count: 100000, + stake_connection: false, + }); +} + +// note: this not a tokio test as runtimes get created as part of the integration test +fn wireup_and_send_txs_via_channel(test_case_params: TestCaseParams) { + configure_panic_hook(); + // value from solana - see quic streamer - see quic.rs -> rt() + const NUM_QUIC_STREAMER_WORKER_THREADS: usize = 1; + let runtime_quic1 = Builder::new_multi_thread() + .worker_threads(NUM_QUIC_STREAMER_WORKER_THREADS) + .thread_name("quic-server") + .enable_all() + .build() + .expect("failed to build tokio runtime for testing quic server"); + + // lite-rpc + let runtime_literpc = Builder::new_multi_thread() + // see lite-rpc -> main.rs + .worker_threads(16) // also works with 1 + .enable_all() + .build() + .expect("failed to build tokio runtime for lite-rpc-tpu-client"); + + let literpc_validator_identity = Arc::new(Keypair::new()); + let udp_listen_socket = UdpSocket::bind("127.0.0.1:0").unwrap(); + let listen_addr = udp_listen_socket.local_addr().unwrap(); + + let (inbound_packets_sender, inbound_packets_receiver) = crossbeam_channel::unbounded(); + + runtime_quic1.block_on(async { + // see log "Start quic server on UdpSocket { addr: 127.0.0.1:xxxxx, fd: 10 }" + let staked_nodes = StakedNodes { + total_stake: 100, + max_stake: 40, + min_stake: 0, + ip_stake_map: Default::default(), + pubkey_stake_map: if test_case_params.stake_connection { + let mut map = HashMap::new(); + map.insert(literpc_validator_identity.pubkey(), 30); + map + } else { + HashMap::default() + }, + }; + + let _solana_quic_streamer = SolanaQuicStreamer::new_start_listening( + udp_listen_socket, + inbound_packets_sender, + staked_nodes, + ); + }); + + runtime_literpc.block_on(async { + tokio::spawn(start_literpc_client( + test_case_params, + listen_addr, + literpc_validator_identity, + )); + }); + + let packet_consumer_jh = thread::spawn(move || { + info!("start pulling packets..."); + let mut packet_count = 0; + let time_to_first = Instant::now(); + let mut latest_tx = Instant::now(); + let timer = Instant::now(); + // second half + let mut timer2 = None; + let mut packet_count2 = 0; + let mut count_map: CountMap = + CountMap::with_capacity(test_case_params.sample_tx_count as usize); + let warmup_tx_count: u32 = test_case_params.sample_tx_count / 2; + while (count_map.len() as u32) < test_case_params.sample_tx_count { + if latest_tx.elapsed() > Duration::from_secs(50) { + warn!("abort after timeout waiting for packet from quic streamer"); + break; + } + + let packet_batch = match inbound_packets_receiver + .recv_timeout(Duration::from_millis(500)) + { + Ok(batch) => batch, + Err(_) => { + debug!("consumer thread did not receive packets on inbound channel recently - continue polling"); + continue; + } + }; + + // reset timer + latest_tx = Instant::now(); + + if packet_count == 0 { + info!( + "time to first packet {}ms", + time_to_first.elapsed().as_millis() + ); + } + + packet_count += packet_batch.len() as u32; + if timer2.is_some() { + packet_count2 += packet_batch.len() as u32; + } + + for packet in packet_batch.iter() { + let tx = packet + .deserialize_slice::(..) + .unwrap(); + trace!( + "read transaction from quic streaming server: {:?}", + tx.get_signature() + ); + count_map.insert_or_increment(*tx.get_signature()); + } + + if packet_count == warmup_tx_count { + timer2 = Some(Instant::now()); + } + if packet_count == test_case_params.sample_tx_count { + break; + } + } // -- while not all packets received - by count + + let total_duration = timer.elapsed(); + let half_duration = timer2 + .map(|t| t.elapsed()) + .unwrap_or(Duration::from_secs(3333)); + + // throughput_50 is second half of transactions - should iron out startup effects + info!( + "consumed {} packets in {}us - throughput {:.2} tps, throughput_50 {:.2} tps, ", + packet_count, + total_duration.as_micros(), + packet_count as f64 / total_duration.as_secs_f64(), + packet_count2 as f64 / half_duration.as_secs_f64(), + ); + + info!("got all expected packets - shutting down tokio runtime with lite-rpc client"); + + assert_eq!( + count_map.len() as u32, + test_case_params.sample_tx_count, + "count_map size should be equal to sample_tx_count" + ); + assert!( + count_map.values().all(|cnt| *cnt == 1), + "all transactions should be unique" + ); + + runtime_literpc.shutdown_timeout(Duration::from_millis(1000)); + }); + + packet_consumer_jh.join().unwrap(); +} + +fn configure_panic_hook() { + let default_panic = std::panic::take_hook(); + std::panic::set_hook(Box::new(move |panic_info| { + default_panic(panic_info); + if let Some(location) = panic_info.location() { + error!( + "panic occurred in file '{}' at line {}", + location.file(), + location.line(), + ); + } else { + error!("panic occurred but can't get location information..."); + } + // note: we do not exit the process to allow proper test execution + })); +} + +fn configure_logging(verbose: bool) { + let env_filter = if verbose { + "debug,rustls=info,quinn=info,quinn_proto=debug,solana_streamer=debug,solana_lite_rpc_quic_forward_proxy=trace" + } else { + "info,rustls=info,quinn=info,quinn_proto=info,solana_streamer=info,solana_lite_rpc_quic_forward_proxy=info" + }; + let span_mode = if verbose { + FmtSpan::CLOSE + } else { + FmtSpan::NONE + }; + // EnvFilter::try_from_default_env().unwrap_or(env_filter) + let filter = + EnvFilter::try_from_default_env().unwrap_or(EnvFilter::from_str(env_filter).unwrap()); + + let result = tracing_subscriber::fmt::fmt() + .with_env_filter(filter) + .with_span_events(span_mode) + .try_init(); + if result.is_err() { + println!("Logging already initialized - ignore"); + } +} + +async fn start_literpc_client( + test_case_params: TestCaseParams, + streamer_listen_addrs: SocketAddr, + literpc_validator_identity: Arc, +) -> anyhow::Result<()> { + info!("Start lite-rpc test client ..."); + + let fanout_slots = 4; + + // (String, Vec) (signature, transaction) + let (sender, _) = tokio::sync::broadcast::channel(MAXIMUM_TRANSACTIONS_IN_QUEUE); + let broadcast_sender = Arc::new(sender); + let (certificate, key) = new_self_signed_tls_certificate( + literpc_validator_identity.as_ref(), + IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), + ) + .expect("Failed to initialize QUIC connection certificates"); + + let tpu_connection_manager = + TpuConnectionManager::new(certificate, key, fanout_slots as usize).await; + + // this effectively controls how many connections we will have + let mut connections_to_keep: HashMap = HashMap::new(); + let addr1 = UdpSocket::bind("127.0.0.1:0") + .unwrap() + .local_addr() + .unwrap(); + connections_to_keep.insert( + Pubkey::from_str("1111111jepwNWbYG87sgwnBbUJnQHrPiUJzMpqJXZ")?, + addr1, + ); + + let addr2 = UdpSocket::bind("127.0.0.1:0") + .unwrap() + .local_addr() + .unwrap(); + connections_to_keep.insert( + Pubkey::from_str("1111111k4AYMctpyJakWNvGcte6tR8BLyZw54R8qu")?, + addr2, + ); + + // this is the real streamer + connections_to_keep.insert(literpc_validator_identity.pubkey(), streamer_listen_addrs); + + // get information about the optional validator identity stake + // populated from get_stakes_for_identity() + let identity_stakes = IdentityStakes { + peer_type: ConnectionPeerType::Staked, + stakes: if test_case_params.stake_connection { + 30 + } else { + 0 + }, // stake of lite-rpc + min_stakes: 0, + max_stakes: 40, + total_stakes: 100, + }; + + // solana_streamer::nonblocking::quic: Peer type: Staked, stake 30, total stake 0, max streams 128 receive_window Ok(12320) from peer 127.0.0.1:8000 + + tpu_connection_manager + .update_connections( + broadcast_sender.clone(), + connections_to_keep, + identity_stakes, + // note: tx_store is useless in this scenario as it is never changed; it's only used to check for duplicates + empty_tx_store().clone(), + QUIC_CONNECTION_PARAMS, + ) + .await; + + for i in 0..test_case_params.sample_tx_count { + let raw_sample_tx = build_raw_sample_tx(i); + broadcast_sender.send(raw_sample_tx)?; + } + + // we need that to keep the tokio runtime dedicated to lite-rpc up long enough + sleep(Duration::from_secs(30)).await; + + // reaching this point means there is problem with test setup and the consumer threed + panic!("should never reach this point") +} + +#[tokio::test] +// taken from solana -> test_nonblocking_quic_client_multiple_writes +async fn solana_quic_streamer_start() { + let (sender, _receiver) = crossbeam_channel::unbounded(); + let staked_nodes = Arc::new(RwLock::new(StakedNodes::default())); + // will create random free port + let sock = UdpSocket::bind("127.0.0.1:0").unwrap(); + let exit = Arc::new(AtomicBool::new(false)); + // keypair to derive the server tls certificate + let keypair = Keypair::new(); + // gossip_host is used in the server certificate + let gossip_host = "127.0.0.1".parse().unwrap(); + let stats = Arc::new(StreamStats::default()); + let (_, t) = solana_streamer::nonblocking::quic::spawn_server( + sock.try_clone().unwrap(), + &keypair, + gossip_host, + sender, + exit.clone(), + 1, + staked_nodes, + 10, + 10, + stats.clone(), + 1000, + ) + .unwrap(); + + let addr = sock.local_addr().unwrap().ip(); + let port = sock.local_addr().unwrap().port(); + let _tpu_addr = SocketAddr::new(addr, port); + + // sleep(Duration::from_millis(500)).await; + + exit.store(true, Ordering::Relaxed); + t.await.unwrap(); + + stats.report(); +} + +struct SolanaQuicStreamer { + _sock: UdpSocket, + _exit: Arc, + _join_handler: JoinHandle<()>, + _stats: Arc, +} + +impl SolanaQuicStreamer { + fn new_start_listening( + udp_socket: UdpSocket, + sender: Sender, + staked_nodes: StakedNodes, + ) -> Self { + let staked_nodes = Arc::new(RwLock::new(staked_nodes)); + let exit = Arc::new(AtomicBool::new(false)); + // keypair to derive the server tls certificate + let keypair = Keypair::new(); + // gossip_host is used in the server certificate + let gossip_host = "127.0.0.1".parse().unwrap(); + let stats = Arc::new(StreamStats::default()); + let (_, jh) = solana_streamer::nonblocking::quic::spawn_server( + udp_socket.try_clone().unwrap(), + &keypair, + gossip_host, + sender, + exit.clone(), + MAX_QUIC_CONNECTIONS_PER_PEER, + staked_nodes, + 10, + 10, + stats.clone(), + 1000, + ) + .unwrap(); + + let addr = udp_socket.local_addr().unwrap().ip(); + let port = udp_socket.local_addr().unwrap().port(); + let _tpu_addr = SocketAddr::new(addr, port); + + Self { + _sock: udp_socket, + _exit: exit, + _join_handler: jh, + _stats: stats, + } + } +} + +const MEMO_PROGRAM_ID: &str = "MemoSq4gqABAXKb96qnH8TysNcWxMyWCqXgDLGmfcHr"; + +pub fn build_raw_sample_tx(i: u32) -> (String, Vec) { + let payer_keypair = Keypair::from_base58_string( + "rKiJ7H5UUp3JR18kNyTF1XPuwPKHEM7gMLWHZPWP5djrW1vSjfwjhvJrevxF9MPmUmN9gJMLHZdLMgc9ao78eKr", + ); + + let tx = build_sample_tx(&payer_keypair, i); + + let raw_tx = bincode::serialize::(&tx).expect("failed to serialize tx"); + + (tx.get_signature().to_string(), raw_tx) +} + +fn build_sample_tx(payer_keypair: &Keypair, i: u32) -> VersionedTransaction { + let blockhash = Hash::default(); + create_memo_tx(format!("hi {}", i).as_bytes(), payer_keypair, blockhash).into() +} + +fn create_memo_tx(msg: &[u8], payer: &Keypair, blockhash: Hash) -> Transaction { + let memo = Pubkey::from_str(MEMO_PROGRAM_ID).unwrap(); + + let instruction = Instruction::new_with_bytes(memo, msg, vec![]); + let message = Message::new(&[instruction], Some(&payer.pubkey())); + Transaction::new(&[payer], message, blockhash) +} diff --git a/test.bash b/test.bash index cc99c2b3..33bf4222 100755 --- a/test.bash +++ b/test.bash @@ -3,6 +3,10 @@ # kill background jobs on exit/failure trap 'kill $(jobs -pr)' SIGINT SIGTERM EXIT +echo "Doing an early build" +cargo build --workspace --tests +yarn + echo "Switching to local lite-rpc rpc config" solana config set --url "http://0.0.0.0:8899"