Skip to content

Commit

Permalink
added idle_timeout and test for swarm builder
Browse files Browse the repository at this point in the history
  • Loading branch information
startup-dreamer committed Jul 11, 2023
1 parent b6b8844 commit 073cee1
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 4 deletions.
42 changes: 40 additions & 2 deletions swarm/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ where

local_supported_protocols: HashSet<StreamProtocol>,
remote_supported_protocols: HashSet<StreamProtocol>,
idle_timeout: Duration,
}

impl<THandler> fmt::Debug for Connection<THandler>
Expand Down Expand Up @@ -176,15 +177,16 @@ where
mut handler: THandler,
substream_upgrade_protocol_override: Option<upgrade::Version>,
max_negotiating_inbound_streams: usize,
idle_timeout: Option<Duration>,
) -> Self {
let initial_protocols = gather_supported_protocols(&handler);

if !initial_protocols.is_empty() {
handler.on_connection_event(ConnectionEvent::LocalProtocolsChange(
ProtocolsChange::Added(ProtocolsAdded::from_set(&initial_protocols)),
));
}

let timeout = idle_timeout.unwrap_or_else(|| Duration::new(0, 0));
Connection {
muxing: muxer,
handler,
Expand All @@ -196,6 +198,7 @@ where
requested_substreams: Default::default(),
local_supported_protocols: initial_protocols,
remote_supported_protocols: Default::default(),
idle_timeout: timeout,
}
}

Expand Down Expand Up @@ -227,6 +230,7 @@ where
substream_upgrade_protocol_override,
local_supported_protocols: supported_protocols,
remote_supported_protocols,
idle_timeout,
} = self.get_mut();

loop {
Expand Down Expand Up @@ -351,7 +355,16 @@ where
*shutdown = Shutdown::Later(Delay::new(dur), t)
}
}
(_, KeepAlive::No) => *shutdown = Shutdown::Asap,
(_, KeepAlive::No) => {
// handle idle_timeout
let duration = *idle_timeout; // Default timeout is 0 seconds
if duration > Duration::new(0, 0) {
let deadline = Instant::now() + duration;
*shutdown = Shutdown::Later(Delay::new(duration), deadline);
} else {
*shutdown = Shutdown::Asap;
}
}
(_, KeepAlive::Yes) => *shutdown = Shutdown::None,
};

Expand Down Expand Up @@ -713,6 +726,7 @@ mod tests {
keep_alive::ConnectionHandler,
None,
max_negotiating_inbound_streams,
None,
);

let result = connection.poll_noop_waker();
Expand All @@ -736,6 +750,7 @@ mod tests {
MockConnectionHandler::new(upgrade_timeout),
None,
2,
None,
);

connection.handler.open_new_outbound();
Expand All @@ -750,6 +765,27 @@ mod tests {
StreamUpgradeError::Timeout
))
}

#[test]
fn test_idle_timeout() {
// Create a custom idle timeout
let idle_timeout = Duration::from_secs(5);

let mut connection = Connection::new(
StreamMuxerBox::new(PendingStreamMuxer),
ConfigurableProtocolConnectionHandler::default(),
None,
0,
Some(idle_timeout),
);

// Create a mock context and pin the connection
let mut cx = Context::from_waker(futures::task::noop_waker_ref());
let connection = Pin::new(&mut connection);

let poll_result = connection.poll(&mut cx);
assert!(poll_result.is_pending());
}

#[test]
fn propagates_changes_to_supported_inbound_protocols() {
Expand All @@ -758,6 +794,7 @@ mod tests {
ConfigurableProtocolConnectionHandler::default(),
None,
0,
None,
);

// First, start listening on a single protocol.
Expand Down Expand Up @@ -796,6 +833,7 @@ mod tests {
ConfigurableProtocolConnectionHandler::default(),
None,
0,
None,
);

// First, remote supports a single protocol.
Expand Down
4 changes: 3 additions & 1 deletion swarm/src/connection/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use futures::{
ready,
stream::FuturesUnordered,
};
use instant::Instant;
use instant::{Duration, Instant};
use libp2p_core::connection::Endpoint;
use libp2p_core::muxing::{StreamMuxerBox, StreamMuxerExt};
use std::task::Waker;
Expand Down Expand Up @@ -492,6 +492,7 @@ where
endpoint: &ConnectedPoint,
connection: NewConnection,
handler: THandler,
idle_timeout: Option<Duration>,
) {
let connection = connection.extract();

Expand All @@ -518,6 +519,7 @@ where
handler,
self.substream_upgrade_protocol_override,
self.max_negotiating_inbound_streams,
idle_timeout,
);

self.executor.spawn(task::new_for_established_connection(
Expand Down
2 changes: 1 addition & 1 deletion swarm/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -736,7 +736,7 @@ where
.expect("n + 1 is always non-zero; qed");

self.pool
.spawn_connection(id, peer_id, &endpoint, connection, handler);
.spawn_connection(id, peer_id, &endpoint, connection, handler, None);

log::debug!(
"Connection established: {:?} {:?}; Total (peer): {}.",
Expand Down

0 comments on commit 073cee1

Please sign in to comment.