Skip to content

Commit

Permalink
WIP future
Browse files Browse the repository at this point in the history
  • Loading branch information
grooviegermanikus committed Dec 12, 2023
1 parent f48fd85 commit 5bea96c
Showing 1 changed file with 110 additions and 76 deletions.
186 changes: 110 additions & 76 deletions cluster-endpoints/examples/grpcmultiplex_fastestwins.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ use tokio::{select};
use tokio::sync::broadcast::{Receiver, Sender};
use tokio::task::{JoinHandle, JoinSet};
use tokio::time::{sleep, Duration, timeout, Instant, sleep_until};
use yellowstone_grpc_client::GeyserGrpcClient;
use yellowstone_grpc_client::{GeyserGrpcClient, GeyserGrpcClientResult};
use yellowstone_grpc_proto::geyser::{CommitmentLevel, SubscribeRequestFilterBlocks, SubscribeRequestFilterBlocksMeta, SubscribeUpdate, SubscribeUpdateBlock, SubscribeUpdateBlockMeta};
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
use yellowstone_grpc_proto::tonic::Status;
use yellowstone_grpc_proto::tonic::transport::ClientTlsConfig;

use solana_lite_rpc_cluster_endpoints::grpc_subscription::{create_block_processing_task, map_produced_block};
Expand All @@ -40,12 +41,13 @@ pub async fn main() {

let (block_sx, blocks_notifier) = tokio::sync::broadcast::channel(1000);

let green_config = GrpcSourceConfig::new("green".to_string(), grpc_addr_mainnet_triton, None);
let blue_config = GrpcSourceConfig::new("blue".to_string(), grpc_addr_mainnet_ams81, None);
let green_config = GrpcSourceConfig::new("triton".to_string(), grpc_addr_mainnet_triton, None);
let blue_config = GrpcSourceConfig::new("mangoams81".to_string(), grpc_addr_mainnet_ams81, None);
let toxiproxy_config = GrpcSourceConfig::new("toxiproxy".to_string(), grpc_addr_mainnet_triton_toxi, None);

create_multiplex(
vec![green_config, blue_config, toxiproxy_config],
// vec![green_config, blue_config, toxiproxy_config],
vec![blue_config],
CommitmentLevel::Confirmed,
block_sx);

Expand Down Expand Up @@ -105,16 +107,13 @@ fn create_multiplex(
let mut start_stream33 = false;
'main_loop: loop {

let block_cmd = select! {
message = futures.next() => {
match message {
Some(message) => {
map_filter_block_message(current_slot, message, commitment_config)
}
None => {
panic!("source stream is not supposed to terminate");
}
}
let message = futures.next().await;
let block_cmd = match message {
Some(message) => {
map_filter_block_message(current_slot, message, commitment_config)
}
None => {
panic!("source stream is not supposed to terminate");
}
};

Expand Down Expand Up @@ -182,80 +181,115 @@ impl GrpcSourceConfig {
}
}


enum ConnectionState<S: Stream<Item = Result<SubscribeUpdate, Status>>> {
NotConnected,
Connecting(JoinHandle<GeyserGrpcClientResult<S>>),
Ready(S),
WaitReconnect,
}


// TODO use GrpcSource
// note: stream never terminates
async fn create_geyser_reconnecting_stream(
grpc_source: GrpcSourceConfig,
commitment_level: CommitmentLevel) -> impl Stream<Item = SubscribeUpdate> {
let label = grpc_source.label.clone();

// NOT_CONNECTED; CONNECTING
let mut state = ConnectionState::NotConnected;

// in case of cancellation, we restart from here:
// thus we want to keep the progression in a state object outside the stream! makro
stream! {
let mut throttle_barrier = Instant::now();
'reconnect_loop: loop {
sleep_until(throttle_barrier).await;
throttle_barrier = Instant::now().add(Duration::from_millis(1000));

let connect_result = GeyserGrpcClient::connect_with_timeout(
grpc_source.grpc_addr.clone(), grpc_source.grpc_x_token.clone(), grpc_source.tls_config.clone(),
Some(Duration::from_secs(2)), Some(Duration::from_secs(2)), false).await;

let mut client = match connect_result {
Ok(connected_client) => connected_client,
Err(geyser_grpc_client_error) => {
// TODO identify non-recoverable errors and cancel stream
warn!("Connect failed on {} - retrying: {:?}", label, geyser_grpc_client_error);
continue 'reconnect_loop;
}
};
state = match state {
ConnectionState::NotConnected => {

let mut blocks_subs = HashMap::new();
blocks_subs.insert(
"client".to_string(),
SubscribeRequestFilterBlocks {
account_include: Default::default(),
include_transactions: Some(true),
include_accounts: Some(false),
include_entries: Some(false),
},
);

let subscribe_result = client
.subscribe_once(
HashMap::new(),
Default::default(),
HashMap::new(),
Default::default(),
blocks_subs,
Default::default(),
Some(commitment_level),
Default::default(),
None,
).await;

let geyser_stream = match subscribe_result {
Ok(subscribed_stream) => subscribed_stream,
Err(geyser_grpc_client_error) => {
// TODO identify non-recoverable errors and cancel stream
warn!("Subscribe failed on {} - retrying: {:?}", label, geyser_grpc_client_error);
continue 'reconnect_loop;
}
};
let connection_task = tokio::spawn(async move {

for await update_message in geyser_stream {
match update_message {
Ok(update_message) => {
info!(">message on {}", label);
yield update_message;
}
Err(tonic_status) => {
// TODO identify non-recoverable errors and cancel stream
warn!("Receive error on {} - retrying: {:?}", label, tonic_status);
continue 'reconnect_loop;
let connect_result = GeyserGrpcClient::connect_with_timeout(
grpc_source.grpc_addr.clone(), grpc_source.grpc_x_token.clone(), grpc_source.tls_config.clone(),
Some(Duration::from_secs(2)), Some(Duration::from_secs(2)), false).await;


let mut client = connect_result?;

// Connected;

let mut blocks_subs = HashMap::new();
blocks_subs.insert(
"client".to_string(),
SubscribeRequestFilterBlocks {
account_include: Default::default(),
include_transactions: Some(true),
include_accounts: Some(false),
include_entries: Some(false),
},
);

let subscribe_result = client
.subscribe_once(
HashMap::new(),
Default::default(),
HashMap::new(),
Default::default(),
blocks_subs,
Default::default(),
Some(commitment_level),
Default::default(),
None,
).await;


subscribe_result
});

ConnectionState::Connecting(connection_task)
}
ConnectionState::Connecting(connection_task) => {
let subscribe_result = connection_task.await;

match subscribe_result {
Ok(Ok(subscribed_stream)) => ConnectionState::Ready(subscribed_stream),
Ok(Err(geyser_error)) => {
// TODO identify non-recoverable errors and cancel stream
warn!("Subscribe failed on {} - retrying: {:?}", label, geyser_error);
ConnectionState::WaitReconnect
},
Err(geyser_grpc_task_error) => {
panic!("Task aborted - should not happen");
}
}
} // -- production loop

warn!("stream consumer loop terminated for {}", label);
} // -- main loop
}
ConnectionState::Ready(geyser_stream) => {

for await update_message in geyser_stream {
match update_message {
Ok(update_message) => {
info!(">message on {}", label);
yield update_message;
}
Err(tonic_status) => {
// TODO identify non-recoverable errors and cancel stream
warn!("Receive error on {} - retrying: {:?}", label, tonic_status);
break;
}
}
} // -- production loop

warn!("Geyser stream exhausted: {:?}", label);
ConnectionState::WaitReconnect

}
ConnectionState::WaitReconnect => {
// TODO implement backoff
sleep(Duration::from_secs(1)).await;
ConnectionState::NotConnected
}
}; // -- match

} // -- stream!

}

0 comments on commit 5bea96c

Please sign in to comment.