diff --git a/cluster-endpoints/examples/grpcmultiplex_fastestwins.rs b/cluster-endpoints/examples/grpcmultiplex_fastestwins.rs index e9db4de7..d4c40c9b 100644 --- a/cluster-endpoints/examples/grpcmultiplex_fastestwins.rs +++ b/cluster-endpoints/examples/grpcmultiplex_fastestwins.rs @@ -14,9 +14,10 @@ use tokio::{select}; use tokio::sync::broadcast::{Receiver, Sender}; use tokio::task::{JoinHandle, JoinSet}; use tokio::time::{sleep, Duration, timeout, Instant, sleep_until}; -use yellowstone_grpc_client::GeyserGrpcClient; +use yellowstone_grpc_client::{GeyserGrpcClient, GeyserGrpcClientResult}; use yellowstone_grpc_proto::geyser::{CommitmentLevel, SubscribeRequestFilterBlocks, SubscribeRequestFilterBlocksMeta, SubscribeUpdate, SubscribeUpdateBlock, SubscribeUpdateBlockMeta}; use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof; +use yellowstone_grpc_proto::tonic::Status; use yellowstone_grpc_proto::tonic::transport::ClientTlsConfig; use solana_lite_rpc_cluster_endpoints::grpc_subscription::{create_block_processing_task, map_produced_block}; @@ -40,12 +41,13 @@ pub async fn main() { let (block_sx, blocks_notifier) = tokio::sync::broadcast::channel(1000); - let green_config = GrpcSourceConfig::new("green".to_string(), grpc_addr_mainnet_triton, None); - let blue_config = GrpcSourceConfig::new("blue".to_string(), grpc_addr_mainnet_ams81, None); + let green_config = GrpcSourceConfig::new("triton".to_string(), grpc_addr_mainnet_triton, None); + let blue_config = GrpcSourceConfig::new("mangoams81".to_string(), grpc_addr_mainnet_ams81, None); let toxiproxy_config = GrpcSourceConfig::new("toxiproxy".to_string(), grpc_addr_mainnet_triton_toxi, None); create_multiplex( - vec![green_config, blue_config, toxiproxy_config], + // vec![green_config, blue_config, toxiproxy_config], + vec![blue_config], CommitmentLevel::Confirmed, block_sx); @@ -105,16 +107,13 @@ fn create_multiplex( let mut start_stream33 = false; 'main_loop: loop { - let block_cmd = select! { - message = futures.next() => { - match message { - Some(message) => { - map_filter_block_message(current_slot, message, commitment_config) - } - None => { - panic!("source stream is not supposed to terminate"); - } - } + let message = futures.next().await; + let block_cmd = match message { + Some(message) => { + map_filter_block_message(current_slot, message, commitment_config) + } + None => { + panic!("source stream is not supposed to terminate"); } }; @@ -182,80 +181,115 @@ impl GrpcSourceConfig { } } + +enum ConnectionState>> { + NotConnected, + Connecting(JoinHandle>), + Ready(S), + WaitReconnect, +} + + // TODO use GrpcSource // note: stream never terminates async fn create_geyser_reconnecting_stream( grpc_source: GrpcSourceConfig, commitment_level: CommitmentLevel) -> impl Stream { let label = grpc_source.label.clone(); + + // NOT_CONNECTED; CONNECTING + let mut state = ConnectionState::NotConnected; + + // in case of cancellation, we restart from here: + // thus we want to keep the progression in a state object outside the stream! makro stream! { - let mut throttle_barrier = Instant::now(); - 'reconnect_loop: loop { - sleep_until(throttle_barrier).await; - throttle_barrier = Instant::now().add(Duration::from_millis(1000)); - - let connect_result = GeyserGrpcClient::connect_with_timeout( - grpc_source.grpc_addr.clone(), grpc_source.grpc_x_token.clone(), grpc_source.tls_config.clone(), - Some(Duration::from_secs(2)), Some(Duration::from_secs(2)), false).await; - - let mut client = match connect_result { - Ok(connected_client) => connected_client, - Err(geyser_grpc_client_error) => { - // TODO identify non-recoverable errors and cancel stream - warn!("Connect failed on {} - retrying: {:?}", label, geyser_grpc_client_error); - continue 'reconnect_loop; - } - }; + state = match state { + ConnectionState::NotConnected => { - let mut blocks_subs = HashMap::new(); - blocks_subs.insert( - "client".to_string(), - SubscribeRequestFilterBlocks { - account_include: Default::default(), - include_transactions: Some(true), - include_accounts: Some(false), - include_entries: Some(false), - }, - ); - - let subscribe_result = client - .subscribe_once( - HashMap::new(), - Default::default(), - HashMap::new(), - Default::default(), - blocks_subs, - Default::default(), - Some(commitment_level), - Default::default(), - None, - ).await; - - let geyser_stream = match subscribe_result { - Ok(subscribed_stream) => subscribed_stream, - Err(geyser_grpc_client_error) => { - // TODO identify non-recoverable errors and cancel stream - warn!("Subscribe failed on {} - retrying: {:?}", label, geyser_grpc_client_error); - continue 'reconnect_loop; - } - }; + let connection_task = tokio::spawn(async move { - for await update_message in geyser_stream { - match update_message { - Ok(update_message) => { - info!(">message on {}", label); - yield update_message; - } - Err(tonic_status) => { - // TODO identify non-recoverable errors and cancel stream - warn!("Receive error on {} - retrying: {:?}", label, tonic_status); - continue 'reconnect_loop; + let connect_result = GeyserGrpcClient::connect_with_timeout( + grpc_source.grpc_addr.clone(), grpc_source.grpc_x_token.clone(), grpc_source.tls_config.clone(), + Some(Duration::from_secs(2)), Some(Duration::from_secs(2)), false).await; + + + let mut client = connect_result?; + + // Connected; + + let mut blocks_subs = HashMap::new(); + blocks_subs.insert( + "client".to_string(), + SubscribeRequestFilterBlocks { + account_include: Default::default(), + include_transactions: Some(true), + include_accounts: Some(false), + include_entries: Some(false), + }, + ); + + let subscribe_result = client + .subscribe_once( + HashMap::new(), + Default::default(), + HashMap::new(), + Default::default(), + blocks_subs, + Default::default(), + Some(commitment_level), + Default::default(), + None, + ).await; + + + subscribe_result + }); + + ConnectionState::Connecting(connection_task) + } + ConnectionState::Connecting(connection_task) => { + let subscribe_result = connection_task.await; + + match subscribe_result { + Ok(Ok(subscribed_stream)) => ConnectionState::Ready(subscribed_stream), + Ok(Err(geyser_error)) => { + // TODO identify non-recoverable errors and cancel stream + warn!("Subscribe failed on {} - retrying: {:?}", label, geyser_error); + ConnectionState::WaitReconnect + }, + Err(geyser_grpc_task_error) => { + panic!("Task aborted - should not happen"); } } - } // -- production loop - warn!("stream consumer loop terminated for {}", label); - } // -- main loop + } + ConnectionState::Ready(geyser_stream) => { + + for await update_message in geyser_stream { + match update_message { + Ok(update_message) => { + info!(">message on {}", label); + yield update_message; + } + Err(tonic_status) => { + // TODO identify non-recoverable errors and cancel stream + warn!("Receive error on {} - retrying: {:?}", label, tonic_status); + break; + } + } + } // -- production loop + + warn!("Geyser stream exhausted: {:?}", label); + ConnectionState::WaitReconnect + + } + ConnectionState::WaitReconnect => { + // TODO implement backoff + sleep(Duration::from_secs(1)).await; + ConnectionState::NotConnected + } + }; // -- match + } // -- stream! }