From 3e8fbfbf856b638d843153f6a80e3beed59838a2 Mon Sep 17 00:00:00 2001 From: buffalu <85544055+buffalu@users.noreply.github.com> Date: Sat, 11 Jun 2022 18:39:10 -0500 Subject: [PATCH] Add beginning of relayer (#1) --- Cargo.toml | 3 +- core/src/fetch_stage.rs | 30 +++++------ core/src/staked_nodes_updater_service.rs | 12 ++--- core/src/tpu.rs | 26 ++++----- jito-protos/protos | 2 +- jito-protos/src/lib.rs | 4 ++ relayer/Cargo.toml | 20 +++++++ relayer/src/lib.rs | 3 ++ relayer/src/manager.rs | 1 + relayer/src/relayer.rs | 68 ++++++++++++++++++++++++ relayer/src/router.rs | 20 +++++++ rpc/src/load_balancer.rs | 65 ++++++++++++++++------ rustfmt.toml | 4 ++ transaction-relayer/Cargo.toml | 1 + transaction-relayer/src/main.rs | 50 +++++++++++++---- 15 files changed, 245 insertions(+), 64 deletions(-) create mode 100644 relayer/Cargo.toml create mode 100644 relayer/src/lib.rs create mode 100644 relayer/src/manager.rs create mode 100644 relayer/src/relayer.rs create mode 100644 relayer/src/router.rs create mode 100644 rustfmt.toml diff --git a/Cargo.toml b/Cargo.toml index 3e44cce9732533..87617b6ea97700 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,5 +3,6 @@ members = [ "core", "jito-protos", "transaction-relayer", - "rpc" + "rpc", + "relayer" ] diff --git a/core/src/fetch_stage.rs b/core/src/fetch_stage.rs index 76a10d190dbf47..c87e6a72da74e7 100644 --- a/core/src/fetch_stage.rs +++ b/core/src/fetch_stage.rs @@ -1,22 +1,20 @@ //! The `fetch_stage` batches input from a UDP socket and sends it to a channel. -use crossbeam_channel::RecvError; -use { - crossbeam_channel::RecvTimeoutError, - solana_perf::{packet::PacketBatchRecycler, recycler::Recycler}, - solana_sdk::packet::{Packet, PacketFlags}, - solana_streamer::streamer::{ - self, PacketBatchReceiver, PacketBatchSender, StreamerReceiveStats, - }, - std::{ - net::UdpSocket, - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, - }, - thread::{self, sleep, Builder, JoinHandle}, - time::Duration, +use std::{ + net::UdpSocket, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, }, + thread::{self, sleep, Builder, JoinHandle}, + time::Duration, +}; + +use crossbeam_channel::{RecvError, RecvTimeoutError}; +use solana_perf::{packet::PacketBatchRecycler, recycler::Recycler}; +use solana_sdk::packet::{Packet, PacketFlags}; +use solana_streamer::streamer::{ + self, PacketBatchReceiver, PacketBatchSender, StreamerReceiveStats, }; #[derive(Debug, thiserror::Error)] diff --git a/core/src/staked_nodes_updater_service.rs b/core/src/staked_nodes_updater_service.rs index a75264660fc93c..8b3abf485980d7 100644 --- a/core/src/staked_nodes_updater_service.rs +++ b/core/src/staked_nodes_updater_service.rs @@ -1,19 +1,18 @@ -use jito_rpc::load_balancer::LoadBalancer; -use log::{error, info}; -use solana_client::client_error; -use solana_client::rpc_response::RpcContactInfo; -use std::sync::Mutex; use std::{ collections::HashMap, net::IpAddr, sync::{ atomic::{AtomicBool, Ordering}, - Arc, RwLock, + Arc, Mutex, RwLock, }, thread::{self, sleep, Builder, JoinHandle}, time::{Duration, Instant}, }; +use jito_rpc::load_balancer::LoadBalancer; +use log::error; +use solana_client::{client_error, rpc_response::RpcContactInfo}; + const IP_TO_STAKE_REFRESH_DURATION: Duration = Duration::from_secs(5); pub struct StakedNodesUpdaterService { @@ -81,6 +80,7 @@ impl StakedNodesUpdaterService { } }) .collect(); + *last_stakes = Instant::now(); Ok(true) } else { sleep(Duration::from_millis(1)); diff --git a/core/src/tpu.rs b/core/src/tpu.rs index 1554134223dcf8..c74f865b5999ca 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -1,24 +1,26 @@ //! The `tpu` module implements the Transaction Processing Unit, a //! multi-stage transaction processing pipeline in software. -use crate::fetch_stage::FetchStage; -use crate::staked_nodes_updater_service::StakedNodesUpdaterService; +use std::{ + collections::HashMap, + net::{IpAddr, UdpSocket}, + sync::{atomic::AtomicBool, Arc, Mutex, RwLock}, + thread, + thread::JoinHandle, +}; + use crossbeam_channel::{unbounded, Receiver}; use jito_rpc::load_balancer::LoadBalancer; -use solana_core::banking_stage::BankingPacketBatch; -use solana_core::find_packet_sender_stake_stage::FindPacketSenderStakeStage; -use solana_core::sigverify::TransactionSigVerifier; -use solana_core::sigverify_stage::SigVerifyStage; +use solana_core::{ + banking_stage::BankingPacketBatch, find_packet_sender_stake_stage::FindPacketSenderStakeStage, + sigverify::TransactionSigVerifier, sigverify_stage::SigVerifyStage, +}; use solana_sdk::signature::Keypair; use solana_streamer::quic::{ spawn_server, StreamStats, MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS, }; -use std::collections::HashMap; -use std::net::{IpAddr, UdpSocket}; -use std::sync::atomic::AtomicBool; -use std::sync::{Arc, Mutex, RwLock}; -use std::thread; -use std::thread::JoinHandle; + +use crate::{fetch_stage::FetchStage, staked_nodes_updater_service::StakedNodesUpdaterService}; pub const DEFAULT_TPU_COALESCE_MS: u64 = 5; diff --git a/jito-protos/protos b/jito-protos/protos index 25f1565a732bfe..3890dcd0e0dc64 160000 --- a/jito-protos/protos +++ b/jito-protos/protos @@ -1 +1 @@ -Subproject commit 25f1565a732bfe77c3e274e2e83f7e57d81febeb +Subproject commit 3890dcd0e0dc643bfd4e51eb66995c15fbcdac25 diff --git a/jito-protos/src/lib.rs b/jito-protos/src/lib.rs index 36c9a035c44ceb..fff6f9ce09bb7f 100644 --- a/jito-protos/src/lib.rs +++ b/jito-protos/src/lib.rs @@ -10,6 +10,10 @@ pub mod packet { tonic::include_proto!("packet"); } +pub mod relayer { + tonic::include_proto!("relayer"); +} + pub mod searcher { tonic::include_proto!("searcher"); } diff --git a/relayer/Cargo.toml b/relayer/Cargo.toml new file mode 100644 index 00000000000000..a0d2a40b891505 --- /dev/null +++ b/relayer/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "jito-relayer" +version = "0.1.0" +description = "Relayer GRPC server" +authors = ["Jito Team "] +homepage = "https://jito.wtf/" +edition = "2021" +publish = false + +[dependencies] +crossbeam-channel = "0.5.4" +jito-protos = { path = "../jito-protos" } +log = "0.4.17" +solana-sdk = "1.10.24" +solana-client = "1.10.24" +solana-core = "1.10.24" +tokio-stream = "0.1.9" +tokio = "1.14.1" +tonic = "0.7.2" +uuid = { version = "1.1.2", features = ["v4"] } \ No newline at end of file diff --git a/relayer/src/lib.rs b/relayer/src/lib.rs new file mode 100644 index 00000000000000..877dcc903e3a1a --- /dev/null +++ b/relayer/src/lib.rs @@ -0,0 +1,3 @@ +mod manager; +pub mod relayer; +mod router; diff --git a/relayer/src/manager.rs b/relayer/src/manager.rs new file mode 100644 index 00000000000000..8b137891791fe9 --- /dev/null +++ b/relayer/src/manager.rs @@ -0,0 +1 @@ + diff --git a/relayer/src/relayer.rs b/relayer/src/relayer.rs new file mode 100644 index 00000000000000..dad626cf11de23 --- /dev/null +++ b/relayer/src/relayer.rs @@ -0,0 +1,68 @@ +use std::time::Duration; + +use crossbeam_channel::Receiver; +use jito_protos::relayer::{ + relayer_service_server::RelayerService, HeartbeatResponse, HeartbeatSubscriptionRequest, + PacketSubscriptionRequest, PacketSubscriptionResponse, +}; +use log::error; +use solana_core::banking_stage::BankingPacketBatch; +use solana_sdk::clock::Slot; +use tokio::{sync::mpsc::channel, time::sleep}; +use tokio_stream::wrappers::ReceiverStream; +use tonic::{Request, Response, Status}; + +use crate::router::Router; + +pub struct Relayer { + router: Router, +} + +impl Relayer { + pub fn new( + slot_receiver: Receiver, + packet_receiver: Receiver, + ) -> Relayer { + let router = Router::new(slot_receiver, packet_receiver); + Relayer { router } + } +} + +#[tonic::async_trait] +impl RelayerService for Relayer { + type SubscribeHeartbeatStream = ReceiverStream>; + + async fn subscribe_heartbeat( + &self, + _request: Request, + ) -> Result, Status> { + let (sender, receiver) = channel(2); + + tokio::spawn(async move { + if let Err(e) = sender.send(Ok(HeartbeatResponse::default())).await { + error!("subscribe_heartbeat error sending response: {:?}", e); + } + sleep(Duration::from_millis(500)).await; + }); + + Ok(Response::new(ReceiverStream::new(receiver))) + } + + type SubscribePacketsStream = ReceiverStream>; + + async fn subscribe_packets( + &self, + _request: Request, + ) -> Result, Status> { + let (sender, receiver) = channel(100); + + tokio::spawn(async move { + if let Err(e) = sender.send(Ok(PacketSubscriptionResponse::default())).await { + error!("subscribe_packets error sending response: {:?}", e); + } + sleep(Duration::from_millis(500)).await; + }); + + Ok(Response::new(ReceiverStream::new(receiver))) + } +} diff --git a/relayer/src/router.rs b/relayer/src/router.rs new file mode 100644 index 00000000000000..d41a278a50efa3 --- /dev/null +++ b/relayer/src/router.rs @@ -0,0 +1,20 @@ +use crossbeam_channel::Receiver; +use solana_core::banking_stage::BankingPacketBatch; +use solana_sdk::clock::Slot; + +pub struct Router { + slot_receiver: Receiver, + packet_receiver: Receiver, +} + +impl Router { + pub fn new( + slot_receiver: Receiver, + packet_receiver: Receiver, + ) -> Router { + Router { + slot_receiver, + packet_receiver, + } + } +} diff --git a/rpc/src/load_balancer.rs b/rpc/src/load_balancer.rs index 88b54bd086db75..7c9510235ad372 100644 --- a/rpc/src/load_balancer.rs +++ b/rpc/src/load_balancer.rs @@ -1,15 +1,21 @@ -use crossbeam_channel::RecvTimeoutError; +use std::{ + collections::HashMap, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, Mutex, + }, + thread, + thread::{sleep, Builder, JoinHandle}, + time::Duration, +}; + +use crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender}; use log::{error, info}; -use solana_client::pubsub_client::PubsubClient; -use solana_client::rpc_client::RpcClient; -use solana_sdk::clock::Slot; -use solana_sdk::commitment_config::{CommitmentConfig, CommitmentLevel}; -use std::collections::HashMap; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::{Arc, Mutex}; -use std::thread; -use std::thread::{sleep, Builder, JoinHandle}; -use std::time::Duration; +use solana_client::{pubsub_client::PubsubClient, rpc_client::RpcClient}; +use solana_sdk::{ + clock::Slot, + commitment_config::{CommitmentConfig, CommitmentLevel}, +}; pub struct LoadBalancer { // (http, websocket) @@ -19,7 +25,10 @@ pub struct LoadBalancer { } impl LoadBalancer { - pub fn new(servers: &[(String, String)], exit: &Arc) -> LoadBalancer { + pub fn new( + servers: &[(String, String)], + exit: &Arc, + ) -> (LoadBalancer, Receiver) { let server_to_slot = HashMap::from_iter(servers.iter().map(|(_, ws)| (ws.clone(), 0))); let server_to_slot = Arc::new(Mutex::new(server_to_slot)); @@ -40,17 +49,22 @@ impl LoadBalancer { })); let server_to_rpc_client = Arc::new(Mutex::new(server_to_rpc_client)); + let (slot_sender, slot_receiver) = unbounded(); let subscription_threads = Self::start_subscription_threads( servers.to_vec(), &server_to_slot, &server_to_rpc_client, exit, + slot_sender, ); - LoadBalancer { - server_to_slot, - server_to_rpc_client, - subscription_threads, - } + ( + LoadBalancer { + server_to_slot, + server_to_rpc_client, + subscription_threads, + }, + slot_receiver, + ) } fn start_subscription_threads( @@ -58,13 +72,18 @@ impl LoadBalancer { server_to_slot: &Arc>>, _server_to_rpc_client: &Arc>>>, exit: &Arc, + slot_sender: Sender, ) -> Vec> { + let highest_slot = Arc::new(Mutex::new(Slot::default())); + servers .iter() .map(|(_, websocket_url)| { let exit = exit.clone(); let websocket_url = websocket_url.clone(); let server_to_slot = server_to_slot.clone(); + let slot_sender = slot_sender.clone(); + let highest_slot = highest_slot.clone(); Builder::new() .name(format_args!("rpc-thread({})", websocket_url).to_string()) @@ -85,6 +104,18 @@ impl LoadBalancer { .lock() .unwrap() .insert(websocket_url.clone(), slot.slot); + { + let mut highest_slot_l = + highest_slot.lock().unwrap(); + if slot.slot > *highest_slot_l { + *highest_slot_l = slot.slot; + if let Err(e) = slot_sender.send(slot.slot) + { + error!("error sending slot: {}", e); + break; + } + } + } } Err(RecvTimeoutError::Timeout) => {} Err(RecvTimeoutError::Disconnected) => { diff --git a/rustfmt.toml b/rustfmt.toml new file mode 100644 index 00000000000000..79ce252ca3420f --- /dev/null +++ b/rustfmt.toml @@ -0,0 +1,4 @@ +edition = "2018" # required by rust-analyzer +imports_granularity="Crate" +format_code_in_doc_comments = true +group_imports = "StdExternalCrate" diff --git a/transaction-relayer/Cargo.toml b/transaction-relayer/Cargo.toml index 4a1dd145128c23..c9f91dc630794f 100644 --- a/transaction-relayer/Cargo.toml +++ b/transaction-relayer/Cargo.toml @@ -16,6 +16,7 @@ itertools = "0.10.3" jito-core = { path = "../core" } jito-protos = { path = "../jito-protos" } jito-rpc = { path = "../rpc" } +jito-relayer = { path = "../relayer" } solana-metrics = "1.10.24" solana-net-utils = "1.10.24" solana-perf = "1.10.24" diff --git a/transaction-relayer/src/main.rs b/transaction-relayer/src/main.rs index b5813436af74af..ae8f91188c0266 100644 --- a/transaction-relayer/src/main.rs +++ b/transaction-relayer/src/main.rs @@ -1,15 +1,21 @@ +use std::{ + net::{IpAddr, SocketAddr}, + str::FromStr, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, Mutex, + }, +}; + use clap::Parser; use jito_core::tpu::{Tpu, TpuSockets}; +use jito_protos::relayer::relayer_service_server::RelayerServiceServer; +use jito_relayer::relayer::Relayer; use jito_rpc::load_balancer::LoadBalancer; -use log::info; use solana_net_utils::multi_bind_in_range; use solana_sdk::signature::{Keypair, Signer}; -use std::net::IpAddr; -use std::str::FromStr; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::{Arc, Mutex}; -use std::thread::sleep; -use std::time::Duration; +use tokio::runtime::Builder; +use tonic::transport::Server; #[derive(Parser, Debug)] #[clap(author, version, about, long_about = None)] @@ -36,11 +42,11 @@ struct Args { /// Bind IP address for GRPC server #[clap(long, env, default_value_t = IpAddr::from_str("0.0.0.0").unwrap())] - server_bind_ip: IpAddr, + grpc_bind_ip: IpAddr, /// Bind port address for GRPC server #[clap(long, env, default_value_t = 42069)] - server_bind_port: u16, + grpc_bind_port: u16, /// Number of TPU threads #[clap(long, env, default_value_t = 32)] @@ -123,12 +129,21 @@ fn main() { let exit = Arc::new(AtomicBool::new(false)); + assert_eq!( + args.rpc_servers.len(), + args.websocket_servers.len(), + "num rpc servers = num websocket servers" + ); + assert!(args.rpc_servers.len() >= 1, "num rpc servers >= 1"); + let servers: Vec<(String, String)> = args .rpc_servers .into_iter() .zip(args.websocket_servers.into_iter()) .collect(); - let rpc_load_balancer = Arc::new(Mutex::new(LoadBalancer::new(&servers, &exit))); + + let (rpc_load_balancer, slot_receiver) = LoadBalancer::new(&servers, &exit); + let rpc_load_balancer = Arc::new(Mutex::new(rpc_load_balancer)); let (tpu, packet_receiver) = Tpu::new( sockets.tpu_sockets, @@ -140,7 +155,20 @@ fn main() { &rpc_load_balancer, ); - sleep(Duration::from_secs(20)); + let rt = Builder::new_multi_thread().enable_all().build().unwrap(); + rt.block_on(async { + let addr = SocketAddr::new(args.grpc_bind_ip, args.grpc_bind_port); + println!("Relayer listening on: {}", addr); + + let relayer = Relayer::new(slot_receiver, packet_receiver); + + let svc = RelayerServiceServer::new(relayer); + Server::builder() + .add_service(svc) + .serve(addr) + .await + .expect("serve server"); + }); exit.store(true, Ordering::Relaxed); tpu.join().unwrap();