diff --git a/core/Cargo.toml b/core/Cargo.toml index 0d9eb3f73bf..39b9236829c 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -34,6 +34,7 @@ thiserror = "1.0" unsigned-varint = "0.4" void = "1" zeroize = "1" +uuid = { version = "0.8", features = [ "v4" ] } [target.'cfg(not(any(target_os = "emscripten", target_os = "unknown")))'.dependencies] ring = { version = "0.16.9", features = ["alloc", "std"], default-features = false } diff --git a/core/src/connection.rs b/core/src/connection.rs index deaa272a7e0..89f3cb705a9 100644 --- a/core/src/connection.rs +++ b/core/src/connection.rs @@ -259,6 +259,8 @@ where } // Poll the handler for new events. + let ids = self.handler.get_negotiating_out_ids(); + log::trace!("Polling ConnectionHandler. io_pending={}, handler_negotiating_out={:?}", io_pending, ids); match self.handler.poll(cx) { Poll::Pending => { if io_pending { diff --git a/core/src/connection/handler.rs b/core/src/connection/handler.rs index 0379ace170b..9a78567e076 100644 --- a/core/src/connection/handler.rs +++ b/core/src/connection/handler.rs @@ -63,6 +63,10 @@ pub trait ConnectionHandler { /// Returning an error will close the connection to the remote. fn poll(&mut self, cx: &mut Context) -> Poll, Self::Error>>; + + fn get_negotiating_out_ids(&self) -> Option> { + None + } } /// Prototype for a `ConnectionHandler`. diff --git a/core/src/upgrade/apply.rs b/core/src/upgrade/apply.rs index f3bee044379..54b91cd45f7 100644 --- a/core/src/upgrade/apply.rs +++ b/core/src/upgrade/apply.rs @@ -61,9 +61,11 @@ where U: OutboundUpgrade> { let iter = up.protocol_info().into_iter().map(NameWrap as fn(_) -> NameWrap<_>); - let future = multistream_select::dialer_select_proto(conn, iter, v); + let id = uuid::Uuid::new_v4(); + let future = multistream_select::dialer_select_proto(conn, iter, v, id); OutboundUpgradeApply { - inner: OutboundUpgradeApplyState::Init { future, upgrade: up } + inner: OutboundUpgradeApplyState::Init { future, upgrade: up }, + id, } } @@ -149,7 +151,8 @@ where C: AsyncRead + AsyncWrite + Unpin, U: OutboundUpgrade> { - inner: OutboundUpgradeApplyState + inner: OutboundUpgradeApplyState, + pub id: uuid::Uuid, } enum OutboundUpgradeApplyState @@ -189,9 +192,11 @@ where Poll::Ready(x) => x, Poll::Pending => { self.inner = OutboundUpgradeApplyState::Init { future, upgrade }; + log::debug!("{} init still pending", self.id); return Poll::Pending } }; + log::debug!("{} Init ready", self.id); self.inner = OutboundUpgradeApplyState::Upgrade { future: Box::pin(upgrade.upgrade_outbound(connection, info.0)) }; @@ -200,14 +205,15 @@ where match Future::poll(Pin::new(&mut future), cx) { Poll::Pending => { self.inner = OutboundUpgradeApplyState::Upgrade { future }; + log::debug!("{} upgrade still pending", self.id); return Poll::Pending } Poll::Ready(Ok(x)) => { - debug!("Successfully applied negotiated protocol"); + debug!("{} Successfully applied negotiated protocol", self.id); return Poll::Ready(Ok(x)) } Poll::Ready(Err(e)) => { - debug!("Failed to apply negotiated protocol"); + log::error!("{} Failed to apply negotiated protocol", self.id); return Poll::Ready(Err(UpgradeError::Apply(e))); } } diff --git a/misc/multistream-select/Cargo.toml b/misc/multistream-select/Cargo.toml index 802e21fd13e..b7d819c806b 100644 --- a/misc/multistream-select/Cargo.toml +++ b/misc/multistream-select/Cargo.toml @@ -16,6 +16,7 @@ log = "0.4" pin-project = "0.4.17" smallvec = "1.0" unsigned-varint = "0.4" +uuid = { version = "0.8", features = [ "v4" ] } [dev-dependencies] async-std = "1.5.0" diff --git a/misc/multistream-select/src/dialer_select.rs b/misc/multistream-select/src/dialer_select.rs index 1f7bffda175..40a28ce80ec 100644 --- a/misc/multistream-select/src/dialer_select.rs +++ b/misc/multistream-select/src/dialer_select.rs @@ -49,7 +49,8 @@ use std::{convert::TryFrom as _, io, iter, mem, pin::Pin, task::{Context, Poll}} pub fn dialer_select_proto( inner: R, protocols: I, - version: Version + version: Version, + id: uuid::Uuid, ) -> DialerSelectFuture where R: AsyncRead + AsyncWrite, @@ -59,8 +60,10 @@ where let iter = protocols.into_iter(); // We choose between the "serial" and "parallel" strategies based on the number of protocols. if iter.size_hint().1.map(|n| n <= 3).unwrap_or(false) { - Either::Left(dialer_select_proto_serial(inner, iter, version)) + log::debug!("{} Protos serial", id); + Either::Left(dialer_select_proto_serial(inner, iter, version, id)) } else { + log::debug!("{} Protos parallel", id); Either::Right(dialer_select_proto_parallel(inner, iter, version)) } } @@ -80,7 +83,8 @@ pub type DialerSelectFuture = Either, DialerSelectPa pub fn dialer_select_proto_serial( inner: R, protocols: I, - version: Version + version: Version, + id: uuid::Uuid, ) -> DialerSelectSeq where R: AsyncRead + AsyncWrite, @@ -93,7 +97,8 @@ where protocols, state: SeqState::SendHeader { io: MessageIO::new(inner), - } + }, + id } } @@ -139,6 +144,7 @@ where protocols: iter::Peekable, state: SeqState, version: Version, + id: uuid::Uuid, } enum SeqState @@ -198,7 +204,7 @@ where if let Err(err) = Pin::new(&mut io).start_send(Message::Protocol(p.clone())) { return Poll::Ready(Err(From::from(err))); } - log::debug!("Dialer: Proposed protocol: {}", p); + log::debug!("{} Dialer: Proposed protocol: {}, peek={:?}, version={:?}", this.id, p, this.protocols.peek().is_some(), this.version); if this.protocols.peek().is_some() { *this.state = SeqState::FlushProtocol { io, protocol } @@ -206,7 +212,7 @@ where match this.version { Version::V1 => *this.state = SeqState::FlushProtocol { io, protocol }, Version::V1Lazy => { - log::debug!("Dialer: Expecting proposed protocol: {}", p); + log::debug!("{} Dialer: Expecting proposed protocol: {}", this.id, p); let io = Negotiated::expecting(io.into_reader(), p, *this.version); return Poll::Ready(Ok((protocol, io))) } @@ -215,9 +221,11 @@ where } SeqState::FlushProtocol { mut io, protocol } => { + log::debug!("{} Flush Protocol", this.id); match Pin::new(&mut io).poll_flush(cx)? { Poll::Ready(()) => *this.state = SeqState::AwaitProtocol { io, protocol }, Poll::Pending => { + log::debug!("{} Flush protocol pending", this.id); *this.state = SeqState::FlushProtocol { io, protocol }; return Poll::Pending }, @@ -225,9 +233,11 @@ where } SeqState::AwaitProtocol { mut io, protocol } => { + log::debug!("{} Await protocol", this.id); let msg = match Pin::new(&mut io).poll_next(cx)? { Poll::Ready(Some(msg)) => msg, Poll::Pending => { + log::debug!("{} Await protocol Pending", this.id); *this.state = SeqState::AwaitProtocol { io, protocol }; return Poll::Pending } @@ -236,19 +246,21 @@ where io::Error::from(io::ErrorKind::UnexpectedEof)))), }; + log::debug!("{} Await protocol ready {:?}", this.id, msg); + match msg { Message::Header(v) if v == *this.version => { *this.state = SeqState::AwaitProtocol { io, protocol }; } Message::Protocol(ref p) if p.as_ref() == protocol.as_ref() => { - log::debug!("Dialer: Received confirmation for protocol: {}", p); + log::debug!("{} Dialer: Received confirmation for protocol: {}", this.id, p); let (io, remaining) = io.into_inner(); let io = Negotiated::completed(io, remaining); return Poll::Ready(Ok((protocol, io))); } Message::NotAvailable => { - log::debug!("Dialer: Received rejection of protocol: {}", - String::from_utf8_lossy(protocol.as_ref())); + log::debug!("{} Dialer: Received rejection of protocol: {}", + this.id, String::from_utf8_lossy(protocol.as_ref())); let protocol = this.protocols.next().ok_or(NegotiationError::Failed)?; *this.state = SeqState::SendProtocol { io, protocol } } diff --git a/muxers/mplex/src/lib.rs b/muxers/mplex/src/lib.rs index ceb77f33bf1..0cd6a0c1fab 100644 --- a/muxers/mplex/src/lib.rs +++ b/muxers/mplex/src/lib.rs @@ -307,6 +307,7 @@ where C: AsyncRead + AsyncWrite + Unpin, } if let Some(out) = filter(&elem) { + trace!("Filter pass for {:?}", elem); return Poll::Ready(Ok(out)); } else { let endpoint = elem.endpoint().unwrap_or(Endpoint::Dialer); @@ -314,6 +315,11 @@ where C: AsyncRead + AsyncWrite + Unpin, inner.buffer.push(elem); } else if !elem.is_close_or_reset_msg() { debug!("Ignored message {:?} because the substream wasn't open", elem); + } else { + match elem { + codec::Elem::Close { .. } => {}, + _ => unreachable!("what about the message?") + } } } } @@ -477,6 +483,7 @@ where C: AsyncRead + AsyncWrite + Unpin if !substream.current_data.is_empty() { let len = cmp::min(substream.current_data.len(), buf.len()); buf[..len].copy_from_slice(&substream.current_data.split_to(len)); + trace!("Poll::Ready for substream {}", substream.num); return Poll::Ready(Ok(len)); } @@ -488,6 +495,15 @@ where C: AsyncRead + AsyncWrite + Unpin // Try to find a packet of data in the buffer. let mut inner = self.inner.lock(); let next_data_poll = next_match(&mut inner, cx, |elem| { + trace!("Filtering. elem={:?}, substream_num={}, endpoint={:?}", elem, substream.num, substream.endpoint); + match elem { + codec::Elem::Data { substream_id, .. } => { + if *substream_id != substream.num { + //panic!("Differ"); + } + }, + _ => {} + }; match elem { codec::Elem::Data { substream_id, endpoint, data, .. } if *substream_id == substream.num && *endpoint != substream.endpoint => // see note [StreamId] diff --git a/protocols/kad/Cargo.toml b/protocols/kad/Cargo.toml index 26431d43f96..b6aaddb6b3d 100644 --- a/protocols/kad/Cargo.toml +++ b/protocols/kad/Cargo.toml @@ -28,6 +28,7 @@ wasm-timer = "0.2" uint = "0.8" unsigned-varint = { version = "0.4", features = ["futures-codec"] } void = "1.0" +uuid = "0.8" [dev-dependencies] futures-timer = "3.0" diff --git a/protocols/kad/src/behaviour.rs b/protocols/kad/src/behaviour.rs index 57811434809..f9e0ffd7496 100644 --- a/protocols/kad/src/behaviour.rs +++ b/protocols/kad/src/behaviour.rs @@ -1454,8 +1454,8 @@ where } KademliaHandlerEvent::QueryError { user_data, error } => { - log::debug!("Request to {:?} in query {:?} failed with {:?}", - source, user_data, error); + log::debug!("Request to {:?} in query {:?} failed with {:?}. Query={:?}", + source, user_data, error, self.queries.get(&user_data).as_ref().map(|q| q.inner.info.clone())); // If the query to which the error relates is still active, // signal the failure w.r.t. `source`. if let Some(query) = self.queries.get_mut(&user_data) { diff --git a/protocols/kad/src/handler.rs b/protocols/kad/src/handler.rs index 57f3c55b02b..6af4e43fa0f 100644 --- a/protocols/kad/src/handler.rs +++ b/protocols/kad/src/handler.rs @@ -58,6 +58,8 @@ pub struct KademliaHandler { /// Until when to keep the connection alive. keep_alive: KeepAlive, + + id: uuid::Uuid, } /// Configuration of a [`KademliaHandler`]. @@ -130,6 +132,34 @@ impl SubstreamState { }, } } + + fn get_conn_id(&self) -> Option { + match self { + SubstreamState::InWaitingMessage(id, _) | + SubstreamState::InWaitingUser(id, _) | + SubstreamState::InPendingSend(id, _, _) | + SubstreamState::InPendingFlush(id, _) => Some(id.clone()), + _ => None + } + } +} + +impl fmt::Debug for SubstreamState { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + SubstreamState::OutPendingOpen(ref msg, _) => f.debug_tuple("OutPendingOpen").field(msg).finish(), + SubstreamState::OutReportError(_, _) => f.debug_tuple("OutReportError").finish(), + SubstreamState::OutPendingSend(_, ref msg, _) => f.debug_tuple("OutPendingSend").field(msg).finish(), + SubstreamState::OutPendingFlush(_, _) => f.debug_tuple("OutPendingFlush").finish(), + SubstreamState::OutWaitingAnswer(_, _) => f.debug_tuple("OutWaitingAnswer").finish(), + SubstreamState::OutClosing(_) => f.debug_tuple("OutClosing").finish(), + SubstreamState::InWaitingMessage(_, _) => f.debug_tuple("InWaitingMessage").finish(), + SubstreamState::InWaitingUser(_, _) => f.debug_tuple("InWaitingUser").finish(), + SubstreamState::InPendingSend(_, _, ref msg) => f.debug_tuple("InPendingSend").field(msg).finish(), + SubstreamState::InPendingFlush(_, _) => f.debug_tuple("InPendingFlush").finish(), + SubstreamState::InClosing(_) => f.debug_tuple("InClosing").finish() + } + } } /// Event produced by the Kademlia handler. @@ -384,6 +414,7 @@ impl KademliaHandler { let keep_alive = KeepAlive::Until(Instant::now() + config.idle_timeout); KademliaHandler { + id: uuid::Uuid::new_v4(), config, next_connec_unique_id: UniqueConnecId(0), substreams: Vec::new(), @@ -425,7 +456,8 @@ where (msg, user_data): Self::OutboundOpenInfo, ) { self.substreams - .push(SubstreamState::OutPendingSend(protocol, msg, user_data)); + .push(SubstreamState::OutPendingSend(protocol, msg.clone(), user_data)); + trace!("{} inject_fully_negotiated_outbound. Number of substreams={:?}, ids={:?}, msg={:?}", self.id, self.substreams.len(), self.substreams.iter().map(|s| s.get_conn_id()).collect::>>(), msg); } fn inject_fully_negotiated_inbound( @@ -444,6 +476,7 @@ where self.next_connec_unique_id.0 += 1; self.substreams .push(SubstreamState::InWaitingMessage(connec_unique_id, protocol)); + trace!("{} inject_fully_negociated_inbound. Number of substreams={:?}, ids={:?}", self.id, self.substreams.len(), self.substreams.iter().map(|s| s.get_conn_id()).collect::>>()); } fn inject_event(&mut self, message: KademliaHandlerIn) { @@ -616,6 +649,7 @@ where ) -> Poll< ProtocolsHandlerEvent, > { + trace!("{} poll. substreams={}", self.id, self.substreams.len()); if self.substreams.is_empty() { return Poll::Pending; } @@ -625,22 +659,27 @@ where let mut substream = self.substreams.swap_remove(n); loop { - match advance_substream(substream, self.config.protocol_config.clone(), cx) { + trace!("{} advancing substream. state={:?}, stream_id={:?}", self.id, substream, substream.get_conn_id()); + match advance_substream(substream, self.config.protocol_config.clone(), cx, self.id.clone()) { (Some(new_state), Some(event), _) => { + trace!("{} Advanced with new_state. new_state={:?}", self.id, new_state); self.substreams.push(new_state); return Poll::Ready(event); } (None, Some(event), _) => { + trace!("{} Advanced without new_state. empty={}", self.id, self.substreams.is_empty()); if self.substreams.is_empty() { self.keep_alive = KeepAlive::Until(Instant::now() + Duration::from_secs(10)); } return Poll::Ready(event); } (Some(new_state), None, false) => { + trace!("{} Advanced with new state and don't poll again. new_state={:?}", self.id, new_state); self.substreams.push(new_state); break; } (Some(new_state), None, true) => { + trace!("{} Advanced with new state and poll again. new_state={:?}", self.id, new_state); substream = new_state; continue; } @@ -680,6 +719,7 @@ fn advance_substream( state: SubstreamState, upgrade: KademliaProtocolConfig, cx: &mut Context, + handler_id: uuid::Uuid, ) -> ( Option>, Option< @@ -693,6 +733,7 @@ fn advance_substream( bool, ) { + let substream_id = state.get_conn_id(); match state { SubstreamState::OutPendingOpen(msg, user_data) => { let ev = ProtocolsHandlerEvent::OutboundSubstreamRequest { @@ -777,6 +818,7 @@ fn advance_substream( } SubstreamState::OutWaitingAnswer(mut substream, user_data) => match Stream::poll_next(Pin::new(&mut substream), cx) { Poll::Ready(Some(Ok(msg))) => { + trace!("{} OutWaitingAnswer Ready: {:?}", handler_id, msg); let new_state = SubstreamState::OutClosing(substream); let event = process_kad_response(msg, user_data); ( @@ -785,12 +827,16 @@ fn advance_substream( true, ) } - Poll::Pending => ( - Some(SubstreamState::OutWaitingAnswer(substream, user_data)), - None, - false, - ), + Poll::Pending => { + trace!("{} OutWaitingAnswer Pending", handler_id); + ( + Some(SubstreamState::OutWaitingAnswer(substream, user_data)), + None, + false, + ) + } Poll::Ready(Some(Err(error))) => { + trace!("{} OutWaitingAnswer Some(Err({:?}))", handler_id, error); let event = KademliaHandlerEvent::QueryError { error: KademliaHandlerQueryErr::Io(error), user_data, @@ -798,6 +844,7 @@ fn advance_substream( (None, Some(ProtocolsHandlerEvent::Custom(event)), false) } Poll::Ready(None) => { + trace!("{:?} OutWaitingAnswer None", handler_id); let event = KademliaHandlerEvent::QueryError { error: KademliaHandlerQueryErr::Io(io::ErrorKind::UnexpectedEof.into()), user_data, @@ -816,6 +863,7 @@ fn advance_substream( }, SubstreamState::InWaitingMessage(id, mut substream) => match Stream::poll_next(Pin::new(&mut substream), cx) { Poll::Ready(Some(Ok(msg))) => { + trace!("{:?} Ready for InWaitingMessage. substream={:?}", handler_id, substream_id); if let Ok(ev) = process_kad_request(msg, id) { ( Some(SubstreamState::InWaitingUser(id, substream)), @@ -826,25 +874,32 @@ fn advance_substream( (Some(SubstreamState::InClosing(substream)), None, true) } } - Poll::Pending => ( - Some(SubstreamState::InWaitingMessage(id, substream)), - None, - false, - ), + Poll::Pending => { + trace!("{:?} Pending for InWaitingMessage. substream={:?}", handler_id, substream_id); + ( + Some(SubstreamState::InWaitingMessage(id, substream)), + None, + false, + ) + }, Poll::Ready(None) => { - trace!("Inbound substream: EOF"); + trace!("{:?} Inbound substream: EOF, substream={:?}", handler_id, substream_id); (None, None, false) } Poll::Ready(Some(Err(e))) => { - trace!("Inbound substream error: {:?}", e); + trace!("{:?} Inbound substream error: {:?}, substream={:?}", handler_id, e, substream_id); (None, None, false) }, }, - SubstreamState::InWaitingUser(id, substream) => ( + SubstreamState::InWaitingUser(id, substream) => + { + trace!("{:?} Ready for InWaitingUser. substream={:?}", handler_id, substream_id); + ( Some(SubstreamState::InWaitingUser(id, substream)), None, false, - ), + ) + } SubstreamState::InPendingSend(id, mut substream, msg) => match Sink::poll_ready(Pin::new(&mut substream), cx) { Poll::Ready(Ok(())) => match Sink::start_send(Pin::new(&mut substream), msg) { Ok(()) => ( diff --git a/protocols/kad/src/protocol.rs b/protocols/kad/src/protocol.rs index 13b1320a172..eabb78beee7 100644 --- a/protocols/kad/src/protocol.rs +++ b/protocols/kad/src/protocol.rs @@ -192,6 +192,7 @@ where fn upgrade_inbound(self, incoming: C, _: Self::Info) -> Self::Future { let mut codec = UviBytes::default(); codec.set_max_len(self.max_packet_size); + log::trace!("upgrade_inbound"); future::ok( Framed::new(incoming, codec) @@ -203,7 +204,9 @@ where future::ready(Ok(io::Cursor::new(buf))) }) .and_then::<_, fn(_) -> _>(|bytes| { - let request = match proto::Message::decode(bytes) { + let decoded = proto::Message::decode(bytes); + log::trace!("Received bytes. decoded={:?}", decoded); + let request = match decoded { Ok(r) => r, Err(err) => return future::ready(Err(err.into())) }; @@ -424,6 +427,7 @@ fn resp_msg_to_proto(kad_msg: KadResponseMsg) -> proto::Message { /// /// Fails if the protobuf message is not a valid and supported Kademlia request message. fn proto_to_req_msg(message: proto::Message) -> Result { + log::trace!("proto_to_req_msg: {:?}", message); let msg_type = proto::message::MessageType::from_i32(message.r#type) .ok_or_else(|| invalid_data(format!("unknown message type: {}", message.r#type)))?; diff --git a/swarm/Cargo.toml b/swarm/Cargo.toml index d02ffff7644..ca9ddb962c1 100644 --- a/swarm/Cargo.toml +++ b/swarm/Cargo.toml @@ -17,6 +17,7 @@ rand = "0.7" smallvec = "1.0" wasm-timer = "0.2" void = "1" +uuid = "0.8" [dev-dependencies] libp2p-mplex = { path = "../muxers/mplex" } diff --git a/swarm/src/protocols_handler/node_handler.rs b/swarm/src/protocols_handler/node_handler.rs index 8b04506170c..23a22bf1d0c 100644 --- a/swarm/src/protocols_handler/node_handler.rs +++ b/swarm/src/protocols_handler/node_handler.rs @@ -98,7 +98,7 @@ where Vec<(InboundUpgradeApply, SendWrapper>, Delay)>, /// Futures that upgrade outgoing substreams. The first element of the tuple is the userdata /// to pass back once successfully opened. - negotiating_out: Vec<( + pub negotiating_out: Vec<( TProtoHandler::OutboundOpenInfo, OutboundUpgradeApply, SendWrapper>, Delay, @@ -225,29 +225,44 @@ where > { // Continue negotiation of newly-opened substreams on the listening side. // We remove each element from `negotiating_in` one by one and add them back if not ready. + log::trace!("We have {} negotiating_in", self.negotiating_in.len()); for n in (0..self.negotiating_in.len()).rev() { + log::trace!("Poll timeout for {}", n); let (mut in_progress, mut timeout) = self.negotiating_in.swap_remove(n); match Future::poll(Pin::new(&mut timeout), cx) { Poll::Ready(_) => continue, Poll::Pending => {}, } + log::trace!("Poll stream for {}", n); match Future::poll(Pin::new(&mut in_progress), cx) { - Poll::Ready(Ok(upgrade)) => - self.handler.inject_fully_negotiated_inbound(upgrade), - Poll::Pending => self.negotiating_in.push((in_progress, timeout)), + Poll::Ready(Ok(upgrade)) => { + log::trace!("Got Poll::Ready(Ok(upgrade))"); + self.handler.inject_fully_negotiated_inbound(upgrade); + } + Poll::Pending => { + log::trace!("Got Poll::Pending"); + self.negotiating_in.push((in_progress, timeout)); + }, // TODO: return a diagnostic event? - Poll::Ready(Err(_err)) => {} + Poll::Ready(Err(_err)) => { + log::error!("Got Poll::Ready(Err(_err))"); + } } } // Continue negotiation of newly-opened substreams. // We remove each element from `negotiating_out` one by one and add them back if not ready. + let ids: Vec = self.negotiating_out.iter().map(|(_, out, _)| out.id.clone()).collect(); + log::trace!("We have {} negotiating_out. outs={:?}", self.negotiating_out.len(), ids); for n in (0..self.negotiating_out.len()).rev() { let (upgr_info, mut in_progress, mut timeout) = self.negotiating_out.swap_remove(n); + log::trace!("Poll timeout for n={}, id={:?}", n, in_progress.id); match Future::poll(Pin::new(&mut timeout), cx) { Poll::Ready(Ok(_)) => { let err = ProtocolsHandlerUpgrErr::Timeout; self.handler.inject_dial_upgrade_error(upgr_info, err); + log::error!("ProcolsHandlerUpgrErr::Timeout. Handler id: {}", in_progress.id); + panic!(); continue; }, Poll::Ready(Err(_)) => { @@ -257,14 +272,19 @@ where }, Poll::Pending => {}, } + log::trace!("Poll future for n={}, id={:?}", n, in_progress.id); match Future::poll(Pin::new(&mut in_progress), cx) { Poll::Ready(Ok(upgrade)) => { + log::trace!("Got Ready for n={}, id={:?}", n, in_progress.id); self.handler.inject_fully_negotiated_outbound(upgrade, upgr_info); } Poll::Pending => { + log::trace!("Got pending for n={}, id={:?}", n, in_progress.id); self.negotiating_out.push((upgr_info, in_progress, timeout)); } Poll::Ready(Err(err)) => { + log::error!("Got err for n={}, id={:?}", n, in_progress.id); + panic!(); let err = ProtocolsHandlerUpgrErr::Upgrade(err); self.handler.inject_dial_upgrade_error(upgr_info, err); } @@ -324,4 +344,8 @@ where Poll::Pending } + + fn get_negotiating_out_ids(&self) -> Option> { + Some(self.negotiating_out.iter().map(|(_, stream, _)| stream.id.clone()).collect()) + } }