Skip to content

Commit

Permalink
Debug for upgrade timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
BlackYoup committed Jun 30, 2020
1 parent c9f1212 commit eefd240
Show file tree
Hide file tree
Showing 13 changed files with 165 additions and 38 deletions.
1 change: 1 addition & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ thiserror = "1.0"
unsigned-varint = "0.4"
void = "1"
zeroize = "1"
uuid = { version = "0.8", features = [ "v4" ] }

[target.'cfg(not(any(target_os = "emscripten", target_os = "unknown")))'.dependencies]
ring = { version = "0.16.9", features = ["alloc", "std"], default-features = false }
Expand Down
2 changes: 2 additions & 0 deletions core/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,8 @@ where
}

// Poll the handler for new events.
let ids = self.handler.get_negotiating_out_ids();
log::trace!("Polling ConnectionHandler. io_pending={}, handler_negotiating_out={:?}", io_pending, ids);
match self.handler.poll(cx) {
Poll::Pending => {
if io_pending {
Expand Down
4 changes: 4 additions & 0 deletions core/src/connection/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ pub trait ConnectionHandler {
/// Returning an error will close the connection to the remote.
fn poll(&mut self, cx: &mut Context)
-> Poll<Result<ConnectionHandlerEvent<Self::OutboundOpenInfo, Self::OutEvent>, Self::Error>>;

fn get_negotiating_out_ids(&self) -> Option<Vec<uuid::Uuid>> {
None
}
}

/// Prototype for a `ConnectionHandler`.
Expand Down
16 changes: 11 additions & 5 deletions core/src/upgrade/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,11 @@ where
U: OutboundUpgrade<Negotiated<C>>
{
let iter = up.protocol_info().into_iter().map(NameWrap as fn(_) -> NameWrap<_>);
let future = multistream_select::dialer_select_proto(conn, iter, v);
let id = uuid::Uuid::new_v4();
let future = multistream_select::dialer_select_proto(conn, iter, v, id);
OutboundUpgradeApply {
inner: OutboundUpgradeApplyState::Init { future, upgrade: up }
inner: OutboundUpgradeApplyState::Init { future, upgrade: up },
id,
}
}

Expand Down Expand Up @@ -149,7 +151,8 @@ where
C: AsyncRead + AsyncWrite + Unpin,
U: OutboundUpgrade<Negotiated<C>>
{
inner: OutboundUpgradeApplyState<C, U>
inner: OutboundUpgradeApplyState<C, U>,
pub id: uuid::Uuid,
}

enum OutboundUpgradeApplyState<C, U>
Expand Down Expand Up @@ -189,9 +192,11 @@ where
Poll::Ready(x) => x,
Poll::Pending => {
self.inner = OutboundUpgradeApplyState::Init { future, upgrade };
log::debug!("{} init still pending", self.id);
return Poll::Pending
}
};
log::debug!("{} Init ready", self.id);
self.inner = OutboundUpgradeApplyState::Upgrade {
future: Box::pin(upgrade.upgrade_outbound(connection, info.0))
};
Expand All @@ -200,14 +205,15 @@ where
match Future::poll(Pin::new(&mut future), cx) {
Poll::Pending => {
self.inner = OutboundUpgradeApplyState::Upgrade { future };
log::debug!("{} upgrade still pending", self.id);
return Poll::Pending
}
Poll::Ready(Ok(x)) => {
debug!("Successfully applied negotiated protocol");
debug!("{} Successfully applied negotiated protocol", self.id);
return Poll::Ready(Ok(x))
}
Poll::Ready(Err(e)) => {
debug!("Failed to apply negotiated protocol");
log::error!("{} Failed to apply negotiated protocol", self.id);
return Poll::Ready(Err(UpgradeError::Apply(e)));
}
}
Expand Down
1 change: 1 addition & 0 deletions misc/multistream-select/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ log = "0.4"
pin-project = "0.4.17"
smallvec = "1.0"
unsigned-varint = "0.4"
uuid = { version = "0.8", features = [ "v4" ] }

[dev-dependencies]
async-std = "1.5.0"
Expand Down
30 changes: 21 additions & 9 deletions misc/multistream-select/src/dialer_select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ use std::{convert::TryFrom as _, io, iter, mem, pin::Pin, task::{Context, Poll}}
pub fn dialer_select_proto<R, I>(
inner: R,
protocols: I,
version: Version
version: Version,
id: uuid::Uuid,
) -> DialerSelectFuture<R, I::IntoIter>
where
R: AsyncRead + AsyncWrite,
Expand All @@ -59,8 +60,10 @@ where
let iter = protocols.into_iter();
// We choose between the "serial" and "parallel" strategies based on the number of protocols.
if iter.size_hint().1.map(|n| n <= 3).unwrap_or(false) {
Either::Left(dialer_select_proto_serial(inner, iter, version))
log::debug!("{} Protos serial", id);
Either::Left(dialer_select_proto_serial(inner, iter, version, id))
} else {
log::debug!("{} Protos parallel", id);
Either::Right(dialer_select_proto_parallel(inner, iter, version))
}
}
Expand All @@ -80,7 +83,8 @@ pub type DialerSelectFuture<R, I> = Either<DialerSelectSeq<R, I>, DialerSelectPa
pub fn dialer_select_proto_serial<R, I>(
inner: R,
protocols: I,
version: Version
version: Version,
id: uuid::Uuid,
) -> DialerSelectSeq<R, I::IntoIter>
where
R: AsyncRead + AsyncWrite,
Expand All @@ -93,7 +97,8 @@ where
protocols,
state: SeqState::SendHeader {
io: MessageIO::new(inner),
}
},
id
}
}

Expand Down Expand Up @@ -139,6 +144,7 @@ where
protocols: iter::Peekable<I>,
state: SeqState<R, I::Item>,
version: Version,
id: uuid::Uuid,
}

enum SeqState<R, N>
Expand Down Expand Up @@ -198,15 +204,15 @@ where
if let Err(err) = Pin::new(&mut io).start_send(Message::Protocol(p.clone())) {
return Poll::Ready(Err(From::from(err)));
}
log::debug!("Dialer: Proposed protocol: {}", p);
log::debug!("{} Dialer: Proposed protocol: {}, peek={:?}, version={:?}", this.id, p, this.protocols.peek().is_some(), this.version);

if this.protocols.peek().is_some() {
*this.state = SeqState::FlushProtocol { io, protocol }
} else {
match this.version {
Version::V1 => *this.state = SeqState::FlushProtocol { io, protocol },
Version::V1Lazy => {
log::debug!("Dialer: Expecting proposed protocol: {}", p);
log::debug!("{} Dialer: Expecting proposed protocol: {}", this.id, p);
let io = Negotiated::expecting(io.into_reader(), p, *this.version);
return Poll::Ready(Ok((protocol, io)))
}
Expand All @@ -215,19 +221,23 @@ where
}

SeqState::FlushProtocol { mut io, protocol } => {
log::debug!("{} Flush Protocol", this.id);
match Pin::new(&mut io).poll_flush(cx)? {
Poll::Ready(()) => *this.state = SeqState::AwaitProtocol { io, protocol },
Poll::Pending => {
log::debug!("{} Flush protocol pending", this.id);
*this.state = SeqState::FlushProtocol { io, protocol };
return Poll::Pending
},
}
}

SeqState::AwaitProtocol { mut io, protocol } => {
log::debug!("{} Await protocol", this.id);
let msg = match Pin::new(&mut io).poll_next(cx)? {
Poll::Ready(Some(msg)) => msg,
Poll::Pending => {
log::debug!("{} Await protocol Pending", this.id);
*this.state = SeqState::AwaitProtocol { io, protocol };
return Poll::Pending
}
Expand All @@ -236,19 +246,21 @@ where
io::Error::from(io::ErrorKind::UnexpectedEof)))),
};

log::debug!("{} Await protocol ready {:?}", this.id, msg);

match msg {
Message::Header(v) if v == *this.version => {
*this.state = SeqState::AwaitProtocol { io, protocol };
}
Message::Protocol(ref p) if p.as_ref() == protocol.as_ref() => {
log::debug!("Dialer: Received confirmation for protocol: {}", p);
log::debug!("{} Dialer: Received confirmation for protocol: {}", this.id, p);
let (io, remaining) = io.into_inner();
let io = Negotiated::completed(io, remaining);
return Poll::Ready(Ok((protocol, io)));
}
Message::NotAvailable => {
log::debug!("Dialer: Received rejection of protocol: {}",
String::from_utf8_lossy(protocol.as_ref()));
log::debug!("{} Dialer: Received rejection of protocol: {}",
this.id, String::from_utf8_lossy(protocol.as_ref()));
let protocol = this.protocols.next().ok_or(NegotiationError::Failed)?;
*this.state = SeqState::SendProtocol { io, protocol }
}
Expand Down
16 changes: 16 additions & 0 deletions muxers/mplex/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,13 +307,19 @@ where C: AsyncRead + AsyncWrite + Unpin,
}

if let Some(out) = filter(&elem) {
trace!("Filter pass for {:?}", elem);
return Poll::Ready(Ok(out));
} else {
let endpoint = elem.endpoint().unwrap_or(Endpoint::Dialer);
if inner.opened_substreams.contains(&(elem.substream_id(), !endpoint)) || elem.is_open_msg() {
inner.buffer.push(elem);
} else if !elem.is_close_or_reset_msg() {
debug!("Ignored message {:?} because the substream wasn't open", elem);
} else {
match elem {
codec::Elem::Close { .. } => {},
_ => unreachable!("what about the message?")
}
}
}
}
Expand Down Expand Up @@ -477,6 +483,7 @@ where C: AsyncRead + AsyncWrite + Unpin
if !substream.current_data.is_empty() {
let len = cmp::min(substream.current_data.len(), buf.len());
buf[..len].copy_from_slice(&substream.current_data.split_to(len));
trace!("Poll::Ready for substream {}", substream.num);
return Poll::Ready(Ok(len));
}

Expand All @@ -488,6 +495,15 @@ where C: AsyncRead + AsyncWrite + Unpin
// Try to find a packet of data in the buffer.
let mut inner = self.inner.lock();
let next_data_poll = next_match(&mut inner, cx, |elem| {
trace!("Filtering. elem={:?}, substream_num={}, endpoint={:?}", elem, substream.num, substream.endpoint);
match elem {
codec::Elem::Data { substream_id, .. } => {
if *substream_id != substream.num {
//panic!("Differ");
}
},
_ => {}
};
match elem {
codec::Elem::Data { substream_id, endpoint, data, .. }
if *substream_id == substream.num && *endpoint != substream.endpoint => // see note [StreamId]
Expand Down
1 change: 1 addition & 0 deletions protocols/kad/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ wasm-timer = "0.2"
uint = "0.8"
unsigned-varint = { version = "0.4", features = ["futures-codec"] }
void = "1.0"
uuid = "0.8"

[dev-dependencies]
futures-timer = "3.0"
Expand Down
4 changes: 2 additions & 2 deletions protocols/kad/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1454,8 +1454,8 @@ where
}

KademliaHandlerEvent::QueryError { user_data, error } => {
log::debug!("Request to {:?} in query {:?} failed with {:?}",
source, user_data, error);
log::debug!("Request to {:?} in query {:?} failed with {:?}. Query={:?}",
source, user_data, error, self.queries.get(&user_data).as_ref().map(|q| q.inner.info.clone()));
// If the query to which the error relates is still active,
// signal the failure w.r.t. `source`.
if let Some(query) = self.queries.get_mut(&user_data) {
Expand Down
Loading

0 comments on commit eefd240

Please sign in to comment.