Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

transports: Fix missing Poll::Ready(None) event from listenener #285

Merged
merged 8 commits into from
Nov 12, 2024
46 changes: 38 additions & 8 deletions src/transport/tcp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -378,20 +378,34 @@ impl Transport for TcpTransport {
}

fn accept_pending(&mut self, connection_id: ConnectionId) -> crate::Result<()> {
let pending = self
.pending_inbound_connections
.remove(&connection_id)
.ok_or(Error::ConnectionDoesntExist(connection_id))?;
let pending = self.pending_inbound_connections.remove(&connection_id).ok_or_else(|| {
tracing::error!(
target: LOG_TARGET,
?connection_id,
"Cannot accept non existent pending connection",
);

Error::ConnectionDoesntExist(connection_id)
})?;

self.on_inbound_connection(connection_id, pending.connection, pending.address);

Ok(())
}

fn reject_pending(&mut self, connection_id: ConnectionId) -> crate::Result<()> {
self.pending_inbound_connections
.remove(&connection_id)
.map_or(Err(Error::ConnectionDoesntExist(connection_id)), |_| Ok(()))
self.pending_inbound_connections.remove(&connection_id).map_or_else(
|| {
tracing::error!(
target: LOG_TARGET,
?connection_id,
"Cannot reject non existent pending connection",
);

Err(Error::ConnectionDoesntExist(connection_id))
},
|_| Ok(()),
)
}

fn reject(&mut self, connection_id: ConnectionId) -> crate::Result<()> {
Expand Down Expand Up @@ -529,7 +543,23 @@ impl Stream for TcpTransport {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if let Poll::Ready(event) = self.listener.poll_next_unpin(cx) {
return match event {
None | Some(Err(_)) => Poll::Ready(None),
None => {
tracing::error!(
target: LOG_TARGET,
"TCP listener terminated, ignore if the node is stopping",
);
Comment on lines +547 to +550
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this error actually ever emitted?

Copy link
Collaborator Author

@lexnv lexnv Nov 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

edit: I did not see this error while testing, I believe this can still happen from the docs: https://docs.rs/tokio-stream/0.1.16/tokio_stream/wrappers/struct.TcpListenerStream.html#method.poll_next


Poll::Ready(None)
}
Some(Err(error)) => {
tracing::error!(
target: LOG_TARGET,
?error,
"TCP listener terminated with error",
);

Poll::Ready(None)
}
Some(Ok((connection, address))) => {
let connection_id = self.context.next_connection_id();
tracing::trace!(
Expand Down
58 changes: 47 additions & 11 deletions src/transport/websocket/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -429,20 +429,34 @@ impl Transport for WebSocketTransport {
}

fn accept_pending(&mut self, connection_id: ConnectionId) -> crate::Result<()> {
let pending = self
.pending_inbound_connections
.remove(&connection_id)
.ok_or(Error::ConnectionDoesntExist(connection_id))?;
let pending = self.pending_inbound_connections.remove(&connection_id).ok_or_else(|| {
tracing::error!(
target: LOG_TARGET,
?connection_id,
"Cannot accept non existent pending connection",
);

Error::ConnectionDoesntExist(connection_id)
})?;

self.on_inbound_connection(connection_id, pending.connection, pending.address);

Ok(())
}

fn reject_pending(&mut self, connection_id: ConnectionId) -> crate::Result<()> {
self.pending_open
.remove(&connection_id)
.map_or(Err(Error::ConnectionDoesntExist(connection_id)), |_| Ok(()))
self.pending_inbound_connections.remove(&connection_id).map_or_else(
|| {
tracing::error!(
target: LOG_TARGET,
?connection_id,
"Cannot reject non existent pending connection",
);

Err(Error::ConnectionDoesntExist(connection_id))
},
|_| Ok(()),
)
}

fn open(
Expand Down Expand Up @@ -573,11 +587,33 @@ impl Stream for WebSocketTransport {
type Item = TransportEvent;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if let Poll::Ready(Some(connection)) = self.listener.poll_next_unpin(cx) {
return match connection {
Err(_) => Poll::Ready(None),
Ok((connection, address)) => {
if let Poll::Ready(event) = self.listener.poll_next_unpin(cx) {
return match event {
None => {
tracing::error!(
target: LOG_TARGET,
"Websocket listener terminated, ignore if the node is stopping",
);

Poll::Ready(None)
}
Some(Err(error)) => {
tracing::error!(
target: LOG_TARGET,
?error,
"Websocket listener terminated with error",
);

Poll::Ready(None)
}
Some(Ok((connection, address))) => {
let connection_id = self.context.next_connection_id();
tracing::trace!(
target: LOG_TARGET,
?connection_id,
?address,
"pending inbound Websocket connection",
);

self.pending_inbound_connections.insert(
connection_id,
Expand Down