diff --git a/core/src/block_processor.rs b/core/src/block_processor.rs index 86d80b36..b9239f0f 100644 --- a/core/src/block_processor.rs +++ b/core/src/block_processor.rs @@ -86,7 +86,7 @@ impl BlockProcessor { let Some(transactions) = block.transactions else { return Ok(BlockProcessorResult::invalid()); - }; + }; let blockhash = block.blockhash; let parent_slot = block.parent_slot; @@ -110,7 +110,13 @@ impl BlockProcessor { let mut transaction_infos = vec![]; transaction_infos.reserve(transactions.len()); for tx in transactions { - let Some(UiTransactionStatusMeta { err, status, compute_units_consumed ,.. }) = tx.meta else { + let Some(UiTransactionStatusMeta { + err, + status, + compute_units_consumed, + .. + }) = tx.meta + else { info!("tx with no meta"); continue; }; diff --git a/lite-rpc/src/postgres.rs b/lite-rpc/src/postgres.rs index dd920833..a6e84ff5 100644 --- a/lite-rpc/src/postgres.rs +++ b/lite-rpc/src/postgres.rs @@ -480,7 +480,7 @@ impl Postgres { let Ok(session) = session else { POSTGRES_SESSION_ERRORS.inc(); - const TIME_OUT:Duration = Duration::from_millis(1000); + const TIME_OUT: Duration = Duration::from_millis(1000); warn!("Unable to get postgres session. Retrying in {TIME_OUT:?}"); tokio::time::sleep(TIME_OUT).await; diff --git a/quic-forward-proxy-integration-test/tests/quic_proxy_tpu_integrationtest.rs b/quic-forward-proxy-integration-test/tests/quic_proxy_tpu_integrationtest.rs index e52e513b..825936c4 100644 --- a/quic-forward-proxy-integration-test/tests/quic_proxy_tpu_integrationtest.rs +++ b/quic-forward-proxy-integration-test/tests/quic_proxy_tpu_integrationtest.rs @@ -144,6 +144,8 @@ pub fn with_10000_transactions_direct() { }); } +// note: this tests are flakes on CI ond also local (see https://mangolana.atlassian.net/browse/MAN-59) +#[ignore] #[test] pub fn with_10000_transactions_proxy() { configure_logging(false); @@ -155,6 +157,8 @@ pub fn with_10000_transactions_proxy() { }); } +// note: this tests are flakes on CI ond also local (see https://mangolana.atlassian.net/browse/MAN-59) +#[ignore] #[test] pub fn many_transactions_proxy() { configure_logging(false); diff --git a/quic-forward-proxy/src/outbound/tx_forward.rs b/quic-forward-proxy/src/outbound/tx_forward.rs index d7224953..fb9f7a4f 100644 --- a/quic-forward-proxy/src/outbound/tx_forward.rs +++ b/quic-forward-proxy/src/outbound/tx_forward.rs @@ -181,8 +181,10 @@ pub async fn tx_forwarder( } } // -- while all packtes from channel + + auto_connection.force_shutdown().await; warn!( - "Quic forwarder agent #{} for TPU {} exited", + "Quic forwarder agent #{} for TPU {} exited; shut down connection", connection_idx, tpu_address ); }); // -- spawned thread for one connection to one TPU diff --git a/quic-forward-proxy/src/quinn_auto_reconnect.rs b/quic-forward-proxy/src/quinn_auto_reconnect.rs index 24ed35af..14c0120c 100644 --- a/quic-forward-proxy/src/quinn_auto_reconnect.rs +++ b/quic-forward-proxy/src/quinn_auto_reconnect.rs @@ -25,6 +25,7 @@ enum ConnectionState { Connection(Connection), PermanentError, FailedAttempt(u32), + ShuttingDown, } pub struct AutoReconnect { @@ -66,6 +67,7 @@ impl AutoReconnect { ConnectionState::Connection(conn) => Ok(conn.clone()), ConnectionState::PermanentError => bail!("permanent error"), ConnectionState::FailedAttempt(_) => bail!("failed connection attempt"), + ConnectionState::ShuttingDown => bail!("shutting down"), } } @@ -172,6 +174,13 @@ impl AutoReconnect { } }; } + ConnectionState::ShuttingDown => { + // no nothing + debug!( + "Not using connection to {} that's shutting down", + self.target_address + ); + } } } @@ -194,6 +203,21 @@ impl AutoReconnect { } } + /// close connection without any sophisticated handling; assumes that send buffers were flushed by .send etc. + pub async fn force_shutdown(&self) { + let mut lock = self.current.write().await; + match &*lock { + ConnectionState::Connection(conn) => { + conn.close(0u32.into(), b"client_shutdown"); + *lock = ConnectionState::ShuttingDown; + } + ConnectionState::NotConnected => *lock = ConnectionState::ShuttingDown, + ConnectionState::PermanentError => *lock = ConnectionState::ShuttingDown, + ConnectionState::FailedAttempt(_) => *lock = ConnectionState::ShuttingDown, + ConnectionState::ShuttingDown => *lock = ConnectionState::ShuttingDown, + } + } + // stable_id 140266619216912, rtt=2.156683ms, // stats FrameStats { ACK: 3, CONNECTION_CLOSE: 0, CRYPTO: 3, // DATA_BLOCKED: 0, DATAGRAM: 0, HANDSHAKE_DONE: 1, MAX_DATA: 0, @@ -213,6 +237,7 @@ impl AutoReconnect { ConnectionState::NotConnected => "n/c".to_string(), ConnectionState::PermanentError => "n/a (permanent)".to_string(), ConnectionState::FailedAttempt(_) => "fail".to_string(), + ConnectionState::ShuttingDown => "shutdown".to_string(), } } } diff --git a/services/src/prometheus_sync.rs b/services/src/prometheus_sync.rs index 3a8a9973..fad8a378 100644 --- a/services/src/prometheus_sync.rs +++ b/services/src/prometheus_sync.rs @@ -44,7 +44,7 @@ impl PrometheusSync { let listener = TcpListener::bind(addr).await?; loop { - let Ok((mut stream, _addr)) = listener.accept().await else { + let Ok((mut stream, _addr)) = listener.accept().await else { error!("Error accepting prometheus stream"); tokio::time::sleep(Duration::from_millis(1)).await; continue; diff --git a/services/src/transaction_service.rs b/services/src/transaction_service.rs index 8cacf59e..f577f76b 100644 --- a/services/src/transaction_service.rs +++ b/services/src/transaction_service.rs @@ -155,7 +155,11 @@ impl TransactionService { }; let signature = tx.signatures[0]; - let Some(BlockInformation { slot, last_valid_blockheight, .. }) = self + let Some(BlockInformation { + slot, + last_valid_blockheight, + .. + }) = self .block_store .get_block_info(&tx.get_recent_blockhash().to_string()) else {