Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Quic Proxy: cleanup/consolidate tests, handle flaky tests, add explicit connection close #169

Merged
merged 8 commits into from
Aug 29, 2023
10 changes: 8 additions & 2 deletions core/src/block_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ impl BlockProcessor {

let Some(transactions) = block.transactions else {
return Ok(BlockProcessorResult::invalid());
};
};

let blockhash = block.blockhash;
let parent_slot = block.parent_slot;
Expand All @@ -110,7 +110,13 @@ impl BlockProcessor {
let mut transaction_infos = vec![];
transaction_infos.reserve(transactions.len());
for tx in transactions {
let Some(UiTransactionStatusMeta { err, status, compute_units_consumed ,.. }) = tx.meta else {
let Some(UiTransactionStatusMeta {
err,
status,
compute_units_consumed,
..
}) = tx.meta
else {
info!("tx with no meta");
continue;
};
Expand Down
2 changes: 1 addition & 1 deletion lite-rpc/src/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ impl Postgres {
let Ok(session) = session else {
POSTGRES_SESSION_ERRORS.inc();

const TIME_OUT:Duration = Duration::from_millis(1000);
const TIME_OUT: Duration = Duration::from_millis(1000);
warn!("Unable to get postgres session. Retrying in {TIME_OUT:?}");
tokio::time::sleep(TIME_OUT).await;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ pub fn with_10000_transactions_direct() {
});
}

// note: this tests are flakes on CI ond also local (see https://mangolana.atlassian.net/browse/MAN-59)
#[ignore]
#[test]
pub fn with_10000_transactions_proxy() {
configure_logging(false);
Expand All @@ -155,6 +157,8 @@ pub fn with_10000_transactions_proxy() {
});
}

// note: this tests are flakes on CI ond also local (see https://mangolana.atlassian.net/browse/MAN-59)
#[ignore]
#[test]
pub fn many_transactions_proxy() {
configure_logging(false);
Expand Down
4 changes: 3 additions & 1 deletion quic-forward-proxy/src/outbound/tx_forward.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,10 @@ pub async fn tx_forwarder(
}
} // -- while all packtes from channel


auto_connection.force_shutdown().await;
warn!(
"Quic forwarder agent #{} for TPU {} exited",
"Quic forwarder agent #{} for TPU {} exited; shut down connection",
connection_idx, tpu_address
);
}); // -- spawned thread for one connection to one TPU
Expand Down
25 changes: 25 additions & 0 deletions quic-forward-proxy/src/quinn_auto_reconnect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ enum ConnectionState {
Connection(Connection),
PermanentError,
FailedAttempt(u32),
ShuttingDown,
}

pub struct AutoReconnect {
Expand Down Expand Up @@ -66,6 +67,7 @@ impl AutoReconnect {
ConnectionState::Connection(conn) => Ok(conn.clone()),
ConnectionState::PermanentError => bail!("permanent error"),
ConnectionState::FailedAttempt(_) => bail!("failed connection attempt"),
ConnectionState::ShuttingDown => bail!("shutting down"),
}
}

Expand Down Expand Up @@ -172,6 +174,13 @@ impl AutoReconnect {
}
};
}
ConnectionState::ShuttingDown => {
// no nothing
debug!(
"Not using connection to {} that's shutting down",
self.target_address
);
}
}
}

Expand All @@ -194,6 +203,21 @@ impl AutoReconnect {
}
}

/// close connection without any sophisticated handling; assumes that send buffers were flushed by .send etc.
pub async fn force_shutdown(&self) {
let mut lock = self.current.write().await;
match &*lock {
ConnectionState::Connection(conn) => {
conn.close(0u32.into(), b"client_shutdown");
*lock = ConnectionState::ShuttingDown;
}
ConnectionState::NotConnected => *lock = ConnectionState::ShuttingDown,
ConnectionState::PermanentError => *lock = ConnectionState::ShuttingDown,
ConnectionState::FailedAttempt(_) => *lock = ConnectionState::ShuttingDown,
ConnectionState::ShuttingDown => *lock = ConnectionState::ShuttingDown,
}
}

// stable_id 140266619216912, rtt=2.156683ms,
// stats FrameStats { ACK: 3, CONNECTION_CLOSE: 0, CRYPTO: 3,
// DATA_BLOCKED: 0, DATAGRAM: 0, HANDSHAKE_DONE: 1, MAX_DATA: 0,
Expand All @@ -213,6 +237,7 @@ impl AutoReconnect {
ConnectionState::NotConnected => "n/c".to_string(),
ConnectionState::PermanentError => "n/a (permanent)".to_string(),
ConnectionState::FailedAttempt(_) => "fail".to_string(),
ConnectionState::ShuttingDown => "shutdown".to_string(),
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion services/src/prometheus_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl PrometheusSync {
let listener = TcpListener::bind(addr).await?;

loop {
let Ok((mut stream, _addr)) = listener.accept().await else {
let Ok((mut stream, _addr)) = listener.accept().await else {
error!("Error accepting prometheus stream");
tokio::time::sleep(Duration::from_millis(1)).await;
continue;
Expand Down
6 changes: 5 additions & 1 deletion services/src/transaction_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,11 @@ impl TransactionService {
};
let signature = tx.signatures[0];

let Some(BlockInformation { slot, last_valid_blockheight, .. }) = self
let Some(BlockInformation {
slot,
last_valid_blockheight,
..
}) = self
.block_store
.get_block_info(&tx.get_recent_blockhash().to_string())
else {
Expand Down
Loading