Skip to content

Commit

Permalink
chore: add client side auth (solana-labs#19)
Browse files Browse the repository at this point in the history
  • Loading branch information
segfaultdoc authored Jul 30, 2022
1 parent a7155fb commit 4c778cd
Show file tree
Hide file tree
Showing 3 changed files with 196 additions and 35 deletions.
182 changes: 182 additions & 0 deletions block_engine/src/auth.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
use std::{cell::RefCell, rc::Rc, sync::Arc};

use jito_protos::block_engine::{
block_engine_relayer_client::BlockEngineRelayerClient, AccountsOfInterestRequest,
AccountsOfInterestUpdate, PacketBatchUpdate, SubscribeExpiringPacketsResponse,
};
use solana_sdk::signature::{Keypair, Signer};
use tokio::sync::mpsc::{channel, Sender};
use tokio_stream::wrappers::ReceiverStream;
use tonic::{
codegen::InterceptedService, metadata::MetadataValue, service::Interceptor, transport::Channel,
Code, Response, Status, Streaming,
};

// Auth header keys
pub const MESSAGE_BIN: &str = "message-bin";
pub const PUBKEY_BIN: &str = "public-key-bin";
pub const SIGNATURE_BIN: &str = "signature-bin";

/// Intercepts requests and adds the necessary headers for auth.
#[derive(Clone)]
pub struct AuthInterceptor {
/// Used to sign the server generated token.
keypair: Arc<Keypair>,
token: Rc<RefCell<String>>,
}

impl AuthInterceptor {
pub fn new(keypair: Arc<Keypair>, token: Rc<RefCell<String>>) -> Self {
AuthInterceptor { keypair, token }
}

pub fn should_retry(
status: &Status,
token: Rc<RefCell<String>>,
max_retries: usize,
n_retries: usize,
) -> bool {
if max_retries == n_retries {
return false;
}

let mut token = token.borrow_mut();
if let Some(new_token) = Self::maybe_new_auth_token(status, &token) {
*token = new_token;
true
} else {
false
}
}

/// Checks to see if the server returned a token to be signed and if it does not equal the current
/// token then the new token is returned and authentication can be retried.
fn maybe_new_auth_token(status: &Status, current_token: &str) -> Option<String> {
if status.code() != Code::Unauthenticated {
return None;
}

let msg = status.message().split_whitespace().collect::<Vec<&str>>();
if msg.len() != 2 {
return None;
}

if msg[0] != "token:" {
return None;
}

if msg[1] != current_token {
Some(msg[1].to_string())
} else {
None
}
}
}

impl Interceptor for AuthInterceptor {
fn call(&mut self, mut request: tonic::Request<()>) -> Result<tonic::Request<()>, Status> {
// Prefix with pubkey and hash it in order to ensure BlockEngine doesn't have us sign a malicious transaction.
let token = format!("{}-{}", self.keypair.pubkey(), self.token.take(),);
let hashed_token = solana_sdk::hash::hash(token.as_bytes());

request.metadata_mut().append_bin(
PUBKEY_BIN,
MetadataValue::from_bytes(&self.keypair.pubkey().to_bytes()),
);
request.metadata_mut().append_bin(
MESSAGE_BIN,
MetadataValue::from_bytes(hashed_token.to_bytes().as_slice()),
);
request.metadata_mut().append_bin(
SIGNATURE_BIN,
MetadataValue::from_bytes(
self.keypair
.sign_message(hashed_token.to_bytes().as_slice())
.as_ref(),
),
);

Ok(request)
}
}

/// Wrapper client that takes care of extracting the auth challenge and retrying requests.
pub struct AuthClient {
inner: BlockEngineRelayerClient<InterceptedService<Channel, AuthInterceptor>>,
token: Rc<RefCell<String>>,
max_retries: usize,
}

impl AuthClient {
pub fn new(
inner: BlockEngineRelayerClient<InterceptedService<Channel, AuthInterceptor>>,
max_retries: usize,
) -> Self {
let token = Rc::new(RefCell::new(String::default()));
Self {
inner,
token,
max_retries,
}
}

pub async fn subscribe_accounts_of_interest(
&mut self,
req: AccountsOfInterestRequest,
) -> Result<Response<Streaming<AccountsOfInterestUpdate>>, Status> {
let mut n_retries = 0;
loop {
return match self.inner.subscribe_accounts_of_interest(req.clone()).await {
Ok(resp) => Ok(resp),
Err(status) => {
if AuthInterceptor::should_retry(
&status,
self.token.clone(),
self.max_retries,
n_retries,
) {
n_retries += 1;
continue;
}
Err(status)
}
};
}
}

pub async fn start_expiring_packet_stream(
&mut self,
buffer: usize,
) -> Result<
(
Sender<PacketBatchUpdate>,
Response<SubscribeExpiringPacketsResponse>,
),
Status,
> {
let mut n_retries = 0;
loop {
let (tx, rx) = channel::<PacketBatchUpdate>(buffer);
let receiver_stream = ReceiverStream::new(rx);
return match self
.inner
.start_expiring_packet_stream(receiver_stream)
.await
{
Ok(resp) => Ok((tx, resp)),
Err(status) => {
if AuthInterceptor::should_retry(
&status,
self.token.clone(),
self.max_retries,
n_retries,
) {
n_retries += 1;
continue;
}
Err(status)
}
};
}
}
}
48 changes: 13 additions & 35 deletions block_engine/src/block_engine.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::{
cell::RefCell,
collections::{hash_map::RandomState, HashSet},
rc::Rc,
str::FromStr,
sync::Arc,
thread,
Expand All @@ -20,37 +22,17 @@ use jito_protos::{
use log::{error, *};
use prost_types::Timestamp;
use solana_perf::packet::PacketBatch;
use solana_sdk::{pubkey::Pubkey, signature::Signer, signer::keypair::Keypair};
use solana_sdk::{pubkey::Pubkey, signer::keypair::Keypair};
use thiserror::Error;
use tokio::{
runtime::Runtime,
select,
sync::mpsc::{channel, Receiver, Sender},
sync::mpsc::{Receiver, Sender},
time::{interval, sleep},
};
use tokio_stream::wrappers::ReceiverStream;
use tonic::{
codegen::InterceptedService,
metadata::AsciiMetadataValue,
service::Interceptor,
transport::{Channel, Endpoint},
IntoStreamingRequest, Request, Response, Status, Streaming,
};

#[derive(Clone)]
pub struct AuthenticationInterceptor {
pub keypair: Arc<Keypair>,
}
use tonic::{transport::Endpoint, Response, Status, Streaming};

impl Interceptor for AuthenticationInterceptor {
fn call(&mut self, mut request: Request<()>) -> Result<Request<()>, Status> {
let pubkey = AsciiMetadataValue::try_from(self.keypair.pubkey().to_string()).unwrap();
// TODO (LB): add better authentication here by signing a message with keypair
// Block Engine expects pubkey to be set
request.metadata_mut().insert("pubkey", pubkey);
Ok(request)
}
}
use crate::auth::{AuthClient, AuthInterceptor};

pub struct BlockEnginePackets {
pub packet_batches: Vec<PacketBatch>,
Expand Down Expand Up @@ -112,17 +94,20 @@ impl BlockEngineRelayerHandler {
.spawn(move || {
let rt = Runtime::new().unwrap();
rt.block_on(async move {
let auth_interceptor = AuthenticationInterceptor { keypair };
let auth_interceptor =
AuthInterceptor::new(keypair, Rc::new(RefCell::new(String::default())));
loop {
sleep(Duration::from_secs(1)).await;

info!("connecting to block engine at url: {:?}", block_engine_url);
match endpoint.connect().await {
Ok(channel) => {
let mut client = BlockEngineRelayerClient::with_interceptor(
let client = BlockEngineRelayerClient::with_interceptor(
channel,
auth_interceptor.clone(),
);
let mut client = AuthClient::new(client, 3);

match Self::start_event_loop(
&mut client,
&mut block_engine_receiver,
Expand Down Expand Up @@ -151,20 +136,13 @@ impl BlockEngineRelayerHandler {
/// If there's a missed heartbeat or any issues responding to each other, they'll disconnect and
/// try to re-establish connection
async fn start_event_loop(
client: &mut BlockEngineRelayerClient<
InterceptedService<Channel, AuthenticationInterceptor>,
>,
client: &mut AuthClient,
block_engine_receiver: &mut Receiver<BlockEnginePackets>,
) -> BlockEngineResult<()> {
let (packet_msg_sender, packet_msg_receiver) = channel::<PacketBatchUpdate>(100);
let receiver_stream = ReceiverStream::new(packet_msg_receiver);

let subscribe_aoi_stream = client
.subscribe_accounts_of_interest(AccountsOfInterestRequest {})
.await?;
let _response = client
.start_expiring_packet_stream(receiver_stream.into_streaming_request())
.await?;
let (packet_msg_sender, _response) = client.start_expiring_packet_stream(100).await?;

Self::handle_packet_stream(
packet_msg_sender,
Expand Down
1 change: 1 addition & 0 deletions block_engine/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pub mod auth;
pub mod block_engine;

0 comments on commit 4c778cd

Please sign in to comment.