Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

transports: Fix missing Poll::Ready(None) event from listenener #285

Merged
merged 8 commits into from
Nov 12, 2024
60 changes: 52 additions & 8 deletions src/transport/tcp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -378,20 +378,34 @@ impl Transport for TcpTransport {
}

fn accept_pending(&mut self, connection_id: ConnectionId) -> crate::Result<()> {
let pending = self
.pending_inbound_connections
.remove(&connection_id)
.ok_or(Error::ConnectionDoesntExist(connection_id))?;
let pending = self.pending_inbound_connections.remove(&connection_id).ok_or_else(|| {
tracing::error!(
target: LOG_TARGET,
?connection_id,
"Cannot accept non existent pending connection",
);

Error::ConnectionDoesntExist(connection_id)
})?;

self.on_inbound_connection(connection_id, pending.connection, pending.address);

Ok(())
}

fn reject_pending(&mut self, connection_id: ConnectionId) -> crate::Result<()> {
self.pending_inbound_connections
.remove(&connection_id)
.map_or(Err(Error::ConnectionDoesntExist(connection_id)), |_| Ok(()))
self.pending_inbound_connections.remove(&connection_id).map_or_else(
|| {
tracing::error!(
target: LOG_TARGET,
?connection_id,
"Cannot reject non existent pending connection",
);

Err(Error::ConnectionDoesntExist(connection_id))
},
|_| Ok(()),
)
}

fn reject(&mut self, connection_id: ConnectionId) -> crate::Result<()> {
Expand Down Expand Up @@ -529,7 +543,23 @@ impl Stream for TcpTransport {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if let Poll::Ready(event) = self.listener.poll_next_unpin(cx) {
return match event {
None | Some(Err(_)) => Poll::Ready(None),
None => {
tracing::error!(
target: LOG_TARGET,
"TCP listener terminated, ignore if the node is stopping",
);
Comment on lines +547 to +550
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this error actually ever emitted?

Copy link
Collaborator Author

@lexnv lexnv Nov 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

edit: I did not see this error while testing, I believe this can still happen from the docs: https://docs.rs/tokio-stream/0.1.16/tokio_stream/wrappers/struct.TcpListenerStream.html#method.poll_next


Poll::Ready(None)
}
Some(Err(error)) => {
tracing::error!(
target: LOG_TARGET,
?error,
"TCP listener terminated with error",
);

Poll::Ready(None)
}
Some(Ok((connection, address))) => {
let connection_id = self.context.next_connection_id();
tracing::trace!(
Expand All @@ -554,9 +584,16 @@ impl Stream for TcpTransport {
};
}

// Whenever we are receiving events by `Poll::Ready` and we choose to not propagate
// them to the higher levels, we should wake up the context to poll us again.
// Otherwise, the scheduler will not know that we are ready to be polled again.
let mut should_wake_up = false;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be reverted, ideally either self.listener / self.pending_raw_connections / self.pending_connections would save the waker.
Wdyt? @dmitry-markin 🙏

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't seem we swallow any pending events from self.listener? As for self.pending_raw_connections / self.pending_connections we better save and trigger a waker only when new connections are added, and not in every poll_next call.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I'll revert this for now and come with a follow-up. I believe this is needed and that's part of the reason why we may experience connection stability issues 🙏


while let Poll::Ready(Some(result)) = self.pending_raw_connections.poll_next_unpin(cx) {
tracing::trace!(target: LOG_TARGET, ?result, "raw connection result");

should_wake_up |= true;

match result {
RawConnectionResult::Connected {
connection_id,
Expand Down Expand Up @@ -617,6 +654,8 @@ impl Stream for TcpTransport {
}

while let Poll::Ready(Some(connection)) = self.pending_connections.poll_next_unpin(cx) {
should_wake_up |= true;

match connection {
Ok(connection) => {
let peer = connection.peer();
Expand All @@ -643,6 +682,11 @@ impl Stream for TcpTransport {
}
}

// We have filtered out all `Poll::Ready` events.
if should_wake_up {
cx.waker().wake_by_ref();
}

Poll::Pending
}
}
Expand Down
72 changes: 61 additions & 11 deletions src/transport/websocket/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -429,20 +429,34 @@ impl Transport for WebSocketTransport {
}

fn accept_pending(&mut self, connection_id: ConnectionId) -> crate::Result<()> {
let pending = self
.pending_inbound_connections
.remove(&connection_id)
.ok_or(Error::ConnectionDoesntExist(connection_id))?;
let pending = self.pending_inbound_connections.remove(&connection_id).ok_or_else(|| {
tracing::error!(
target: LOG_TARGET,
?connection_id,
"Cannot accept non existent pending connection",
);

Error::ConnectionDoesntExist(connection_id)
})?;

self.on_inbound_connection(connection_id, pending.connection, pending.address);

Ok(())
}

fn reject_pending(&mut self, connection_id: ConnectionId) -> crate::Result<()> {
self.pending_open
.remove(&connection_id)
.map_or(Err(Error::ConnectionDoesntExist(connection_id)), |_| Ok(()))
self.pending_inbound_connections.remove(&connection_id).map_or_else(
|| {
tracing::error!(
target: LOG_TARGET,
?connection_id,
"Cannot reject non existent pending connection",
);

Err(Error::ConnectionDoesntExist(connection_id))
},
|_| Ok(()),
)
}

fn open(
Expand Down Expand Up @@ -573,11 +587,33 @@ impl Stream for WebSocketTransport {
type Item = TransportEvent;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if let Poll::Ready(Some(connection)) = self.listener.poll_next_unpin(cx) {
return match connection {
Err(_) => Poll::Ready(None),
Ok((connection, address)) => {
if let Poll::Ready(event) = self.listener.poll_next_unpin(cx) {
return match event {
None => {
tracing::error!(
target: LOG_TARGET,
"Websocket listener terminated, ignore if the node is stopping",
);

Poll::Ready(None)
}
Some(Err(error)) => {
tracing::error!(
target: LOG_TARGET,
?error,
"Websocket listener terminated with error",
);

Poll::Ready(None)
}
Some(Ok((connection, address))) => {
let connection_id = self.context.next_connection_id();
tracing::trace!(
target: LOG_TARGET,
?connection_id,
?address,
"pending inbound Websocket connection",
);

self.pending_inbound_connections.insert(
connection_id,
Expand All @@ -594,9 +630,16 @@ impl Stream for WebSocketTransport {
};
}

// Whenever we are receiving events by `Poll::Ready` and we choose to not propagate
// them to the higher levels, we should wake up the context to poll us again.
// Otherwise, the scheduler will not know that we are ready to be polled again.
let mut should_wake_up = false;

while let Poll::Ready(Some(result)) = self.pending_raw_connections.poll_next_unpin(cx) {
tracing::trace!(target: LOG_TARGET, ?result, "raw connection result");

should_wake_up |= true;

match result {
RawConnectionResult::Connected {
connection_id,
Expand Down Expand Up @@ -657,6 +700,8 @@ impl Stream for WebSocketTransport {
}

while let Poll::Ready(Some(connection)) = self.pending_connections.poll_next_unpin(cx) {
should_wake_up |= true;

match connection {
Ok(connection) => {
let peer = connection.peer();
Expand All @@ -683,6 +728,11 @@ impl Stream for WebSocketTransport {
}
}

// We have filtered out all `Poll::Ready` events.
if should_wake_up {
cx.waker().wake_by_ref();
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same here: we better save and trigger a waker only when adding new elements to pending(_raw)_connections.

Poll::Pending
}
}