From c9ba86744780e8ed12abd0a293acb543d500b8c3 Mon Sep 17 00:00:00 2001 From: Dennis Trautwein Date: Tue, 14 Jun 2022 11:36:33 +0200 Subject: [PATCH] Check opened streams periodically Fixes issue #10 Pull request https://github.com/libp2p/go-libp2p-core/pull/250 removed the OpenStream notification. Punchr uses the notification here: https://github.com/dennis-tra/punchr/blob/c4b2fcf93e63ae4a8656864a399638c0c9641372/cmd/client/host.go#L372 --- cmd/client/host.go | 104 ++++++++++++++++----------------------------- 1 file changed, 37 insertions(+), 67 deletions(-) diff --git a/cmd/client/host.go b/cmd/client/host.go index bd38d47..2088dfe 100644 --- a/cmd/client/host.go +++ b/cmd/client/host.go @@ -24,6 +24,7 @@ import ( ) var ( + Version = "0.2.0" CommunicationTimeout = 15 * time.Second RetryCount = 3 ) @@ -33,7 +34,6 @@ type Host struct { host.Host holePunchEventsPeers sync.Map - streamOpenPeers sync.Map } func InitHost(ctx context.Context, privKey crypto.PrivKey) (*Host, error) { @@ -41,13 +41,12 @@ func InitHost(ctx context.Context, privKey crypto.PrivKey) (*Host, error) { h := &Host{ holePunchEventsPeers: sync.Map{}, - streamOpenPeers: sync.Map{}, } // Configure new libp2p host libp2pHost, err := libp2p.New( libp2p.Identity(privKey), - libp2p.UserAgent("punchr/go-client/0.1.0"), + libp2p.UserAgent("punchr/go-client/"+Version), libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"), libp2p.ListenAddrStrings("/ip4/0.0.0.0/udp/0/quic"), libp2p.ListenAddrStrings("/ip6/::/tcp/0"), @@ -184,13 +183,13 @@ func (h *Host) HolePunch(ctx context.Context, addrInfo peer.AddrInfo) *HolePunch for i := 0; i < RetryCount; i++ { // wait for the DCUtR stream to be opened select { - case <-h.WaitForDCUtRStream(addrInfo.ID): - // pass - case <-time.After(CommunicationTimeout): - // Stream was not opened in time by the remote. - hpState.Outcome = pb.HolePunchOutcome_HOLE_PUNCH_OUTCOME_NO_STREAM - hpState.Error = "/libp2p/dcutr stream was not opened after " + CommunicationTimeout.String() - return hpState + case _, ok := <-h.WaitForDCUtRStream(addrInfo.ID): + if !ok { + // Stream was not opened in time by the remote. + hpState.Outcome = pb.HolePunchOutcome_HOLE_PUNCH_OUTCOME_NO_STREAM + hpState.Error = "/libp2p/dcutr stream was not opened after " + CommunicationTimeout.String() + return hpState + } case <-ctx.Done(): hpState.Outcome = pb.HolePunchOutcome_HOLE_PUNCH_OUTCOME_CANCELLED hpState.Error = ctx.Err().Error() @@ -274,25 +273,39 @@ func (hps HolePunchState) TrackHolePunch(ctx context.Context, remoteID peer.ID, } func (h *Host) WaitForDCUtRStream(pid peer.ID) <-chan struct{} { - evtChan := make(chan struct{}) - h.streamOpenPeers.Store(pid, evtChan) + dcutrOpenedChan := make(chan struct{}) - // Exit early if the DCUtR stream is already open - for _, conn := range h.Network().ConnsToPeer(pid) { - for _, stream := range conn.GetStreams() { - if stream.Protocol() == holepunch.Protocol { - // If not found, it was already closed by the open stream handler - if _, found := h.streamOpenPeers.LoadAndDelete(pid); !found { - return evtChan + // The following go routine is a hack. We want to be notified as soon as the remote peer has opened a dcutr stream. + // go-libp2p v0.20.0 has removed the OpenedStream notification (which didn't really work anyway). Now we check + // every 10 ms all streams on all connections for the /libp2p/dcutr stream. If + go func() { + timeout := time.After(CommunicationTimeout) + timer := time.NewTimer(0) + for { + select { + case <-timeout: + close(dcutrOpenedChan) + return + case <-timer.C: + } + + for _, conn := range h.Network().ConnsToPeer(pid) { + for _, stream := range conn.GetStreams() { + if stream.Protocol() != holepunch.Protocol { + continue + } + h.logEntry(pid).Debugln("/libp2p/dcutr stream opened!") + dcutrOpenedChan <- struct{}{} + close(dcutrOpenedChan) + return } - close(evtChan) - return evtChan } + timer.Reset(10 * time.Millisecond) } - } + }() h.logEntry(pid).Infoln("Waiting for /libp2p/dcutr stream...") - return evtChan + return dcutrOpenedChan } func (h *Host) RegisterPeerToTrace(pid peer.ID) <-chan *holepunch.Event { @@ -312,7 +325,7 @@ func (h *Host) UnregisterPeerToTrace(pid peer.ID) { for { select { case evt := <-evtChan: - h.logEntry(pid).WithField("evtType", evt.Type).Infoln("Draining event channel") + h.logEntry(pid).WithField("evtType", evt.Type).Warnln("Draining event channel") default: close(evtChan) return @@ -362,46 +375,3 @@ func (h *Host) Listen(network.Network, multiaddr.Multiaddr) {} func (h *Host) ListenClose(network.Network, multiaddr.Multiaddr) {} func (h *Host) Connected(network.Network, network.Conn) {} func (h *Host) Disconnected(network.Network, network.Conn) {} -func (h *Host) ClosedStream(_ network.Network, stream network.Stream) { - if stream.Protocol() != holepunch.Protocol { - return - } - h.logEntry(stream.Conn().RemotePeer()).Debugln("/libp2p/dcutr stream closed!") -} - -func (h *Host) OpenedStream(_ network.Network, stream network.Stream) { - // The following is a hack. `stream` does not have the `Protocol` field set yet. So we just check - // every 5 ms for a total of 15 s. - go func() { - timeout := time.After(CommunicationTimeout) - timer := time.NewTimer(0) - for { - - select { - case <-timeout: - return - case <-timer.C: - } - - if stream.Protocol() == "" { - timer.Reset(5 * time.Millisecond) - continue - } - - if stream.Protocol() != holepunch.Protocol { - return - } - - break - - } - - val, found := h.streamOpenPeers.LoadAndDelete(stream.Conn().RemotePeer()) - if !found { - return - } - - h.logEntry(stream.Conn().RemotePeer()).Debugln("/libp2p/dcutr stream opened!") - close(val.(chan struct{})) - }() -}