Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Custom tpu client #105

Merged
merged 8 commits into from
Apr 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
854 changes: 379 additions & 475 deletions Cargo.lock

Large diffs are not rendered by default.

71 changes: 55 additions & 16 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,33 +3,31 @@ name = "lite-rpc"
version = "0.1.0"
edition = "2021"
description = "A lite version of solana rpc to send and confirm transactions"
rust-version = "1.67.1"

[workspace]
members = [
"bench"
]

[dev-dependencies]
bench = { path = "./bench" }

[dependencies]
solana-sdk = { git = "https://github.com/blockworks-foundation/solana", branch="lite_rpc" }
solana-rpc-client = { git = "https://github.com/blockworks-foundation/solana", branch="lite_rpc" }
solana-rpc-client-api= { git = "https://github.com/blockworks-foundation/solana", branch="lite_rpc" }
solana-tpu-client= { git = "https://github.com/blockworks-foundation/solana", branch="lite_rpc" }
solana-quic-client= { git = "https://github.com/blockworks-foundation/solana", branch="lite_rpc" }
solana-pubsub-client= { git = "https://github.com/blockworks-foundation/solana", branch="lite_rpc" }
solana-transaction-status = { git = "https://github.com/blockworks-foundation/solana", branch="lite_rpc" }
solana-version= { git = "https://github.com/blockworks-foundation/solana", branch="lite_rpc" }
solana-client= { git = "https://github.com/blockworks-foundation/solana", branch="lite_rpc" }
[workspace.dependencies]
solana-sdk = "1.15.2"
solana-rpc-client = "1.15.2"
solana-rpc-client-api = "1.15.2"
solana-transaction-status = "1.15.2"
solana-version = "1.15.2"
solana-client = "1.15.2"
solana-net-utils = "1.15.2"
solana-pubsub-client = "1.15.2"
solana-streamer = "1.15.2"
serde = { version = "1.0.152", features = ["derive"] }
serde_json = "1.0.93"
tokio = { version = "1.25.0", features = ["full"]}
tokio = { version = "1.27.0", features = ["full", "fs"]}
bincode = "1.3.3"
bs58 = "0.4.0"
base64 = "0.21.0"
thiserror = "1.0.38"
futures = "0.3.26"
thiserror = "1.0.40"
futures = "0.3.28"
bytes = "1.4.0"
anyhow = "1.0.69"
log = "0.4.17"
Expand All @@ -45,3 +43,44 @@ prometheus = "0.13.3"
lazy_static = "1.4.0"
dotenv = "0.15.0"
async-channel = "1.8.0"
quinn = "0.9.3"
rustls = { version = "0.20.6", default-features = false }

[dev-dependencies]
bench = { path = "./bench" }

[dependencies]
solana-sdk = { workspace = true }
solana-rpc-client = { workspace = true }
solana-rpc-client-api = { workspace = true }
solana-transaction-status = { workspace = true }
solana-version = { workspace = true }
solana-client = { workspace = true }
solana-net-utils = { workspace = true }
solana-pubsub-client = { workspace = true }
solana-streamer = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true }
bincode = { workspace = true }
bs58 = { workspace = true }
base64 = { workspace = true }
thiserror = { workspace = true }
futures = { workspace = true }
bytes = { workspace = true }
anyhow = { workspace = true }
log = { workspace = true }
clap = { workspace = true }
dashmap = { workspace = true }
const_env = { workspace = true }
jsonrpsee = { workspace = true }
tracing-subscriber = { workspace = true }
tokio-postgres = { workspace = true }
native-tls = { workspace = true }
postgres-native-tls = { workspace = true }
prometheus = { workspace = true }
lazy_static = { workspace = true }
dotenv = { workspace = true }
async-channel = { workspace = true }
quinn = { workspace = true }
rustls = { workspace = true }
23 changes: 12 additions & 11 deletions bench/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,17 @@ version = "0.1.0"
edition = "2021"

[dependencies]
solana-sdk = { git = "https://github.com/blockworks-foundation/solana", branch="lite_rpc" }
solana-rpc-client = { git = "https://github.com/blockworks-foundation/solana", branch="lite_rpc" }
log = "0.4.17"
anyhow = "1.0.69"
serde = "1.0.152"
serde_json = "1.0.93"
csv = "1.2.0"
clap = { version = "4.1.6", features = ["derive"] }
tokio = { version = "1.25.0", features = ["full", "fs"]}
tracing-subscriber = "0.3.16"
dirs = "4.0.0"
solana-sdk = { workspace = true }
solana-rpc-client = { workspace = true }
log = { workspace = true }
anyhow = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
clap = { workspace = true }
tokio = { workspace = true }
tracing-subscriber = { workspace = true }
csv = "1.2.1"
dirs = "5.0.0"
rand = "0.8.5"
rand_chacha = "0.3.1"

18 changes: 12 additions & 6 deletions bench/src/helpers.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::str::FromStr;
use std::{str::FromStr, time::Duration};

use anyhow::Context;
use rand::{distributions::Alphanumeric, prelude::Distribution};
use rand::{distributions::Alphanumeric, prelude::Distribution, SeedableRng};
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::{
commitment_config::CommitmentConfig,
Expand All @@ -14,8 +14,10 @@ use solana_sdk::{
system_instruction,
transaction::Transaction,
};
use tokio::time::Instant;

const MEMO_PROGRAM_ID: &str = "MemoSq4gqABAXKb96qnH8TysNcWxMyWCqXgDLGmfcHr";
const WAIT_LIMIT_IN_SECONDS: u64 = 60;

pub struct BenchHelper;

Expand All @@ -40,7 +42,11 @@ impl BenchHelper {
sig: &Signature,
commitment_config: CommitmentConfig,
) -> anyhow::Result<()> {
let instant = Instant::now();
loop {
if instant.elapsed() > Duration::from_secs(WAIT_LIMIT_IN_SECONDS) {
return Err(anyhow::Error::msg("Timedout waiting"));
}
if let Some(err) = rpc_client
.get_signature_status_with_commitment(sig, commitment_config)
.await?
Expand Down Expand Up @@ -68,13 +74,13 @@ impl BenchHelper {
num_of_txs: usize,
funded_payer: &Keypair,
blockhash: Hash,
random_seed: Option<u64>,
) -> Vec<Transaction> {
let seed = random_seed.map_or(0, |x| x);
let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(seed);
(0..num_of_txs)
.map(|_| {
let random_bytes: Vec<u8> = Alphanumeric
.sample_iter(rand::thread_rng())
.take(10)
.collect();
let random_bytes: Vec<u8> = Alphanumeric.sample_iter(&mut rng).take(10).collect();

Self::create_memo_tx(&random_bytes, funded_payer, blockhash)
})
Expand Down
2 changes: 1 addition & 1 deletion bench/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ async fn bench(rpc_client: Arc<RpcClient>, tx_count: usize) -> Metric {
let funded_payer = BenchHelper::get_payer().await.unwrap();
let blockhash = rpc_client.get_latest_blockhash().await.unwrap();

let txs = BenchHelper::generate_txs(tx_count, &funded_payer, blockhash);
let txs = BenchHelper::generate_txs(tx_count, &funded_payer, blockhash, None);

let mut un_confirmed_txs: HashMap<Signature, Option<Instant>> =
HashMap::with_capacity(txs.len());
Expand Down
61 changes: 38 additions & 23 deletions src/bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,18 @@ use crate::{
configs::{IsBlockHashValidConfig, SendTransactionConfig},
encoding::BinaryEncoding,
rpc::LiteRpcServer,
tpu_manager::TpuManager,
workers::{
BlockListener, Cleaner, MetricsCapture, Postgres, PrometheusSync, TxSender, WireTransaction,
tpu_utils::tpu_service::TpuService, BlockListener, Cleaner, MetricsCapture, Postgres,
PrometheusSync, TxSender, WireTransaction,
},
DEFAULT_MAX_NUMBER_OF_TXS_IN_QUEUE,
};

use std::{ops::Deref, str::FromStr, sync::Arc, time::Duration};

use anyhow::bail;

use log::info;
use log::{error, info};

use jsonrpsee::{server::ServerBuilder, types::SubscriptionResult, SubscriptionSink};

Expand All @@ -30,7 +31,7 @@ use solana_sdk::{
use solana_transaction_status::TransactionStatus;
use tokio::{
net::ToSocketAddrs,
sync::mpsc::{self, UnboundedSender},
sync::mpsc::{self, Sender},
task::JoinHandle,
};

Expand All @@ -55,9 +56,9 @@ lazy_static::lazy_static! {
/// A bridge between clients and tpu
pub struct LiteBridge {
pub rpc_client: Arc<RpcClient>,
pub tpu_manager: Arc<TpuManager>,
pub tpu_service: Arc<TpuService>,
// None if LiteBridge is not executed
pub tx_send_channel: Option<UnboundedSender<(String, WireTransaction, u64)>>,
pub tx_send_channel: Option<Sender<(String, WireTransaction, u64)>>,
pub tx_sender: TxSender,
pub block_listner: BlockListener,
pub block_store: BlockStore,
Expand All @@ -71,11 +72,19 @@ impl LiteBridge {
identity: Keypair,
) -> anyhow::Result<Self> {
let rpc_client = Arc::new(RpcClient::new(rpc_url.clone()));
let current_slot = rpc_client.get_slot().await?;

let tpu_service = TpuService::new(
Arc::new(std::sync::atomic::AtomicU64::new(current_slot)),
fanout_slots,
Arc::new(identity),
rpc_client.clone(),
ws_addr,
)
.await?;
let tpu_service = Arc::new(tpu_service);

let tpu_manager =
Arc::new(TpuManager::new(rpc_client.clone(), ws_addr, fanout_slots, identity).await?);

let tx_sender = TxSender::new(tpu_manager.clone());
let tx_sender = TxSender::new(tpu_service.clone());

let block_store = BlockStore::new(&rpc_client).await?;

Expand All @@ -84,7 +93,7 @@ impl LiteBridge {

Ok(Self {
rpc_client,
tpu_manager,
tpu_service,
tx_send_channel: None,
tx_sender,
block_listner,
Expand All @@ -98,8 +107,6 @@ impl LiteBridge {
mut self,
http_addr: T,
ws_addr: T,
tx_batch_size: usize,
tx_send_interval: Duration,
clean_interval: Duration,
enable_postgres: bool,
prometheus_addr: T,
Expand All @@ -114,15 +121,15 @@ impl LiteBridge {
(None, None)
};

let (tx_send, tx_recv) = mpsc::unbounded_channel();
let mut tpu_services = self.tpu_service.start().await?;

let (tx_send, tx_recv) = mpsc::channel(DEFAULT_MAX_NUMBER_OF_TXS_IN_QUEUE);
self.tx_send_channel = Some(tx_send);

let tx_sender = self.tx_sender.clone().execute(
tx_recv,
tx_batch_size,
tx_send_interval,
postgres_send.clone(),
);
let tx_sender = self
.tx_sender
.clone()
.execute(tx_recv, postgres_send.clone());

let metrics_capture = MetricsCapture::new(self.tx_sender.clone()).capture();
let prometheus_sync = PrometheusSync.sync(prometheus_addr);
Expand All @@ -141,7 +148,6 @@ impl LiteBridge {
self.tx_sender.clone(),
self.block_listner.clone(),
self.block_store.clone(),
self.tpu_manager.clone(),
)
.start(clean_interval);

Expand Down Expand Up @@ -186,6 +192,8 @@ impl LiteBridge {
cleaner,
];

services.append(&mut tpu_services);

if let Some(postgres) = postgres {
services.push(postgres);
}
Expand Down Expand Up @@ -231,11 +239,18 @@ impl LiteRpcServer for LiteBridge {
return Err(jsonrpsee::core::Error::Custom("Blockhash not found in block store".to_string()));
};

self.tx_send_channel
if let Err(e) = self
.tx_send_channel
.as_ref()
.expect("Lite Bridge Not Executed")
.send((sig.to_string(), raw_tx, slot))
.unwrap();
.await
{
error!(
"Internal error sending transaction on send channel error {}",
e
);
}
TXS_IN_CHANNEL.inc();

Ok(BinaryEncoding::Base58.encode(sig))
Expand Down
11 changes: 1 addition & 10 deletions src/cli.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
use crate::{
DEFAULT_CLEAN_INTERVAL_MS, DEFAULT_FANOUT_SIZE, DEFAULT_RPC_ADDR, DEFAULT_TX_BATCH_INTERVAL_MS,
DEFAULT_TX_BATCH_SIZE, DEFAULT_WS_ADDR,
};
use crate::{DEFAULT_CLEAN_INTERVAL_MS, DEFAULT_FANOUT_SIZE, DEFAULT_RPC_ADDR, DEFAULT_WS_ADDR};
use clap::Parser;

#[derive(Parser, Debug)]
Expand All @@ -15,15 +12,9 @@ pub struct Args {
pub lite_rpc_http_addr: String,
#[arg(short = 's', long, default_value_t = String::from("[::]:8891"))]
pub lite_rpc_ws_addr: String,
/// batch size of each batch forward
#[arg(short = 'b', long, default_value_t = DEFAULT_TX_BATCH_SIZE)]
pub tx_batch_size: usize,
/// tpu fanout
#[arg(short = 'f', long, default_value_t = DEFAULT_FANOUT_SIZE) ]
pub fanout_size: u64,
/// interval between each batch forward
#[arg(short = 'i', long, default_value_t = DEFAULT_TX_BATCH_INTERVAL_MS)]
pub tx_batch_interval_ms: u64,
/// interval between clean
#[arg(short = 'c', long, default_value_t = DEFAULT_CLEAN_INTERVAL_MS)]
pub clean_interval_ms: u64,
Expand Down
10 changes: 5 additions & 5 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ pub mod configs;
pub mod encoding;
pub mod errors;
pub mod rpc;
pub mod tpu_manager;
pub mod workers;

#[from_env]
Expand All @@ -20,13 +19,14 @@ pub const DEFAULT_WS_ADDR: &str = "ws://0.0.0.0:8900";
#[from_env]
pub const DEFAULT_TX_MAX_RETRIES: u16 = 1;
#[from_env]
pub const DEFAULT_TX_BATCH_SIZE: usize = 512;
pub const DEFAULT_TX_BATCH_SIZE: usize = 32;

/// 25 slots in 10s send to little more leaders
#[from_env]
pub const DEFAULT_FANOUT_SIZE: u64 = 30;
pub const DEFAULT_MAX_NUMBER_OF_TXS_IN_QUEUE: usize = 40_000;

/// 25 slots in 10s send to little more leaders
#[from_env]
pub const DEFAULT_TX_BATCH_INTERVAL_MS: u64 = 100;
pub const DEFAULT_FANOUT_SIZE: u64 = 100;
#[from_env]
pub const DEFAULT_CLEAN_INTERVAL_MS: u64 = 5 * 60 * 1000; // five minute
pub const DEFAULT_TRANSACTION_CONFIRMATION_STATUS: TransactionConfirmationStatus =
Expand Down
Loading