Skip to content

Commit

Permalink
Add beginning of relayer (solana-labs#1)
Browse files Browse the repository at this point in the history
  • Loading branch information
buffalu committed Jun 11, 2022
1 parent f524f47 commit 3e8fbfb
Show file tree
Hide file tree
Showing 15 changed files with 245 additions and 64 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@ members = [
"core",
"jito-protos",
"transaction-relayer",
"rpc"
"rpc",
"relayer"
]
30 changes: 14 additions & 16 deletions core/src/fetch_stage.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,20 @@
//! The `fetch_stage` batches input from a UDP socket and sends it to a channel.

use crossbeam_channel::RecvError;
use {
crossbeam_channel::RecvTimeoutError,
solana_perf::{packet::PacketBatchRecycler, recycler::Recycler},
solana_sdk::packet::{Packet, PacketFlags},
solana_streamer::streamer::{
self, PacketBatchReceiver, PacketBatchSender, StreamerReceiveStats,
},
std::{
net::UdpSocket,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
thread::{self, sleep, Builder, JoinHandle},
time::Duration,
use std::{
net::UdpSocket,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
thread::{self, sleep, Builder, JoinHandle},
time::Duration,
};

use crossbeam_channel::{RecvError, RecvTimeoutError};
use solana_perf::{packet::PacketBatchRecycler, recycler::Recycler};
use solana_sdk::packet::{Packet, PacketFlags};
use solana_streamer::streamer::{
self, PacketBatchReceiver, PacketBatchSender, StreamerReceiveStats,
};

#[derive(Debug, thiserror::Error)]
Expand Down
12 changes: 6 additions & 6 deletions core/src/staked_nodes_updater_service.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
use jito_rpc::load_balancer::LoadBalancer;
use log::{error, info};
use solana_client::client_error;
use solana_client::rpc_response::RpcContactInfo;
use std::sync::Mutex;
use std::{
collections::HashMap,
net::IpAddr,
sync::{
atomic::{AtomicBool, Ordering},
Arc, RwLock,
Arc, Mutex, RwLock,
},
thread::{self, sleep, Builder, JoinHandle},
time::{Duration, Instant},
};

use jito_rpc::load_balancer::LoadBalancer;
use log::error;
use solana_client::{client_error, rpc_response::RpcContactInfo};

const IP_TO_STAKE_REFRESH_DURATION: Duration = Duration::from_secs(5);

pub struct StakedNodesUpdaterService {
Expand Down Expand Up @@ -81,6 +80,7 @@ impl StakedNodesUpdaterService {
}
})
.collect();
*last_stakes = Instant::now();
Ok(true)
} else {
sleep(Duration::from_millis(1));
Expand Down
26 changes: 14 additions & 12 deletions core/src/tpu.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,26 @@
//! The `tpu` module implements the Transaction Processing Unit, a
//! multi-stage transaction processing pipeline in software.

use crate::fetch_stage::FetchStage;
use crate::staked_nodes_updater_service::StakedNodesUpdaterService;
use std::{
collections::HashMap,
net::{IpAddr, UdpSocket},
sync::{atomic::AtomicBool, Arc, Mutex, RwLock},
thread,
thread::JoinHandle,
};

use crossbeam_channel::{unbounded, Receiver};
use jito_rpc::load_balancer::LoadBalancer;
use solana_core::banking_stage::BankingPacketBatch;
use solana_core::find_packet_sender_stake_stage::FindPacketSenderStakeStage;
use solana_core::sigverify::TransactionSigVerifier;
use solana_core::sigverify_stage::SigVerifyStage;
use solana_core::{
banking_stage::BankingPacketBatch, find_packet_sender_stake_stage::FindPacketSenderStakeStage,
sigverify::TransactionSigVerifier, sigverify_stage::SigVerifyStage,
};
use solana_sdk::signature::Keypair;
use solana_streamer::quic::{
spawn_server, StreamStats, MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS,
};
use std::collections::HashMap;
use std::net::{IpAddr, UdpSocket};
use std::sync::atomic::AtomicBool;
use std::sync::{Arc, Mutex, RwLock};
use std::thread;
use std::thread::JoinHandle;

use crate::{fetch_stage::FetchStage, staked_nodes_updater_service::StakedNodesUpdaterService};

pub const DEFAULT_TPU_COALESCE_MS: u64 = 5;

Expand Down
2 changes: 1 addition & 1 deletion jito-protos/protos
Submodule protos updated from 25f156 to 3890dc
4 changes: 4 additions & 0 deletions jito-protos/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ pub mod packet {
tonic::include_proto!("packet");
}

pub mod relayer {
tonic::include_proto!("relayer");
}

pub mod searcher {
tonic::include_proto!("searcher");
}
Expand Down
20 changes: 20 additions & 0 deletions relayer/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
[package]
name = "jito-relayer"
version = "0.1.0"
description = "Relayer GRPC server"
authors = ["Jito Team <team@jito.wtf>"]
homepage = "https://jito.wtf/"
edition = "2021"
publish = false

[dependencies]
crossbeam-channel = "0.5.4"
jito-protos = { path = "../jito-protos" }
log = "0.4.17"
solana-sdk = "1.10.24"
solana-client = "1.10.24"
solana-core = "1.10.24"
tokio-stream = "0.1.9"
tokio = "1.14.1"
tonic = "0.7.2"
uuid = { version = "1.1.2", features = ["v4"] }
3 changes: 3 additions & 0 deletions relayer/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
mod manager;
pub mod relayer;
mod router;
1 change: 1 addition & 0 deletions relayer/src/manager.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

68 changes: 68 additions & 0 deletions relayer/src/relayer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
use std::time::Duration;

use crossbeam_channel::Receiver;
use jito_protos::relayer::{
relayer_service_server::RelayerService, HeartbeatResponse, HeartbeatSubscriptionRequest,
PacketSubscriptionRequest, PacketSubscriptionResponse,
};
use log::error;
use solana_core::banking_stage::BankingPacketBatch;
use solana_sdk::clock::Slot;
use tokio::{sync::mpsc::channel, time::sleep};
use tokio_stream::wrappers::ReceiverStream;
use tonic::{Request, Response, Status};

use crate::router::Router;

pub struct Relayer {
router: Router,
}

impl Relayer {
pub fn new(
slot_receiver: Receiver<Slot>,
packet_receiver: Receiver<BankingPacketBatch>,
) -> Relayer {
let router = Router::new(slot_receiver, packet_receiver);
Relayer { router }
}
}

#[tonic::async_trait]
impl RelayerService for Relayer {
type SubscribeHeartbeatStream = ReceiverStream<Result<HeartbeatResponse, Status>>;

async fn subscribe_heartbeat(
&self,
_request: Request<HeartbeatSubscriptionRequest>,
) -> Result<Response<Self::SubscribeHeartbeatStream>, Status> {
let (sender, receiver) = channel(2);

tokio::spawn(async move {
if let Err(e) = sender.send(Ok(HeartbeatResponse::default())).await {
error!("subscribe_heartbeat error sending response: {:?}", e);
}
sleep(Duration::from_millis(500)).await;
});

Ok(Response::new(ReceiverStream::new(receiver)))
}

type SubscribePacketsStream = ReceiverStream<Result<PacketSubscriptionResponse, Status>>;

async fn subscribe_packets(
&self,
_request: Request<PacketSubscriptionRequest>,
) -> Result<Response<Self::SubscribePacketsStream>, Status> {
let (sender, receiver) = channel(100);

tokio::spawn(async move {
if let Err(e) = sender.send(Ok(PacketSubscriptionResponse::default())).await {
error!("subscribe_packets error sending response: {:?}", e);
}
sleep(Duration::from_millis(500)).await;
});

Ok(Response::new(ReceiverStream::new(receiver)))
}
}
20 changes: 20 additions & 0 deletions relayer/src/router.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
use crossbeam_channel::Receiver;
use solana_core::banking_stage::BankingPacketBatch;
use solana_sdk::clock::Slot;

pub struct Router {
slot_receiver: Receiver<Slot>,
packet_receiver: Receiver<BankingPacketBatch>,
}

impl Router {
pub fn new(
slot_receiver: Receiver<Slot>,
packet_receiver: Receiver<BankingPacketBatch>,
) -> Router {
Router {
slot_receiver,
packet_receiver,
}
}
}
65 changes: 48 additions & 17 deletions rpc/src/load_balancer.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
use crossbeam_channel::RecvTimeoutError;
use std::{
collections::HashMap,
sync::{
atomic::{AtomicBool, Ordering},
Arc, Mutex,
},
thread,
thread::{sleep, Builder, JoinHandle},
time::Duration,
};

use crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender};
use log::{error, info};
use solana_client::pubsub_client::PubsubClient;
use solana_client::rpc_client::RpcClient;
use solana_sdk::clock::Slot;
use solana_sdk::commitment_config::{CommitmentConfig, CommitmentLevel};
use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::thread;
use std::thread::{sleep, Builder, JoinHandle};
use std::time::Duration;
use solana_client::{pubsub_client::PubsubClient, rpc_client::RpcClient};
use solana_sdk::{
clock::Slot,
commitment_config::{CommitmentConfig, CommitmentLevel},
};

pub struct LoadBalancer {
// (http, websocket)
Expand All @@ -19,7 +25,10 @@ pub struct LoadBalancer {
}

impl LoadBalancer {
pub fn new(servers: &[(String, String)], exit: &Arc<AtomicBool>) -> LoadBalancer {
pub fn new(
servers: &[(String, String)],
exit: &Arc<AtomicBool>,
) -> (LoadBalancer, Receiver<Slot>) {
let server_to_slot = HashMap::from_iter(servers.iter().map(|(_, ws)| (ws.clone(), 0)));
let server_to_slot = Arc::new(Mutex::new(server_to_slot));

Expand All @@ -40,31 +49,41 @@ impl LoadBalancer {
}));
let server_to_rpc_client = Arc::new(Mutex::new(server_to_rpc_client));

let (slot_sender, slot_receiver) = unbounded();
let subscription_threads = Self::start_subscription_threads(
servers.to_vec(),
&server_to_slot,
&server_to_rpc_client,
exit,
slot_sender,
);
LoadBalancer {
server_to_slot,
server_to_rpc_client,
subscription_threads,
}
(
LoadBalancer {
server_to_slot,
server_to_rpc_client,
subscription_threads,
},
slot_receiver,
)
}

fn start_subscription_threads(
servers: Vec<(String, String)>,
server_to_slot: &Arc<Mutex<HashMap<String, Slot>>>,
_server_to_rpc_client: &Arc<Mutex<HashMap<String, Arc<RpcClient>>>>,
exit: &Arc<AtomicBool>,
slot_sender: Sender<Slot>,
) -> Vec<JoinHandle<()>> {
let highest_slot = Arc::new(Mutex::new(Slot::default()));

servers
.iter()
.map(|(_, websocket_url)| {
let exit = exit.clone();
let websocket_url = websocket_url.clone();
let server_to_slot = server_to_slot.clone();
let slot_sender = slot_sender.clone();
let highest_slot = highest_slot.clone();

Builder::new()
.name(format_args!("rpc-thread({})", websocket_url).to_string())
Expand All @@ -85,6 +104,18 @@ impl LoadBalancer {
.lock()
.unwrap()
.insert(websocket_url.clone(), slot.slot);
{
let mut highest_slot_l =
highest_slot.lock().unwrap();
if slot.slot > *highest_slot_l {
*highest_slot_l = slot.slot;
if let Err(e) = slot_sender.send(slot.slot)
{
error!("error sending slot: {}", e);
break;
}
}
}
}
Err(RecvTimeoutError::Timeout) => {}
Err(RecvTimeoutError::Disconnected) => {
Expand Down
4 changes: 4 additions & 0 deletions rustfmt.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
edition = "2018" # required by rust-analyzer
imports_granularity="Crate"
format_code_in_doc_comments = true
group_imports = "StdExternalCrate"
1 change: 1 addition & 0 deletions transaction-relayer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ itertools = "0.10.3"
jito-core = { path = "../core" }
jito-protos = { path = "../jito-protos" }
jito-rpc = { path = "../rpc" }
jito-relayer = { path = "../relayer" }
solana-metrics = "1.10.24"
solana-net-utils = "1.10.24"
solana-perf = "1.10.24"
Expand Down
Loading

0 comments on commit 3e8fbfb

Please sign in to comment.