Skip to content

Commit

Permalink
fix: WebRTC transport unhandled promise rejection during connect (#2299)
Browse files Browse the repository at this point in the history
Fixes a crash in node where the abort signal passed into a WebRTC
private to private dial would cause an unhandled promise rejection:

```
file:///Users/alex/Documents/Workspaces/ipfs/helia-http-gateway/node_modules/@libp2p/webrtc/dist/src/private-to-private/initiate-connection.js:42
            connectedPromise.reject(new CodeError('SDP handshake aborted', 'ERR_SDP_HANDSHAKE_ABORTED'));
                                    ^

CodeError: SDP handshake aborted
    at EventTarget.sdpAbortedListener (file:///Users/alex/Documents/Workspaces/ipfs/helia-http-gateway/node_modules/@libp2p/webrtc/dist/src/private-to-private/initiate-connection.js:42:37)
    at [nodejs.internal.kHybridDispatch] (node:internal/event_target:807:20)
    at EventTarget.dispatchEvent (node:internal/event_target:742:26)
    at abortSignal (node:internal/abort_controller:369:10)
    at AbortController.abort (node:internal/abort_controller:391:5)
    at EventTarget.onAbort (file:///Users/alex/Documents/Workspaces/ipfs/helia-http-gateway/node_modules/any-signal/dist/src/index.js:8:20)
    at [nodejs.internal.kHybridDispatch] (node:internal/event_target:807:20)
    at EventTarget.dispatchEvent (node:internal/event_target:742:26)
    at abortSignal (node:internal/abort_controller:369:10)
    at AbortController.abort (node:internal/abort_controller:391:5) {
  code: 'ERR_SDP_HANDSHAKE_ABORTED',
  props: {}
}

Node.js v20.8.0
```

Simplifies the connection logic to just use the abort signal to
abort the dial instead of the abort signal and multiple deferred
promises.

---------

Co-authored-by: Chad Nehemiah <chad.nehemiah94@gmail.com>
  • Loading branch information
achingbrain and maschad authored Dec 5, 2023
1 parent 57944fa commit 64a915a
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 78 deletions.
1 change: 0 additions & 1 deletion packages/transport-webrtc/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@
"@multiformats/mafmt": "^12.1.6",
"@multiformats/multiaddr": "^12.1.10",
"@multiformats/multiaddr-matcher": "^1.1.0",
"any-signal": "^4.1.1",
"detect-browser": "^5.3.0",
"it-length-prefixed": "^9.0.3",
"it-pipe": "^3.0.1",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import { CodeError } from '@libp2p/interface'
import { peerIdFromString } from '@libp2p/peer-id'
import { pbStream } from 'it-protobuf-stream'
import pDefer, { type DeferredPromise } from 'p-defer'
import { type RTCPeerConnection, RTCSessionDescription } from '../webrtc/index.js'
import { Message } from './pb/message.js'
import { SIGNALING_PROTO_ID, splitAddr, type WebRTCTransportMetrics } from './transport.js'
import { readCandidatesUntilConnected, resolveOnConnected } from './util.js'
import { readCandidatesUntilConnected } from './util.js'
import type { DataChannelOptions } from '../index.js'
import type { LoggerOptions, Connection } from '@libp2p/interface'
import type { ConnectionManager, IncomingStreamData, TransportManager } from '@libp2p/interface-internal'
Expand Down Expand Up @@ -65,17 +64,8 @@ export async function initiateConnection ({ peerConnection, signal, metrics, mul
})

const messageStream = pbStream(stream).pb(Message)
const connectedPromise: DeferredPromise<void> = pDefer()
const sdpAbortedListener = (): void => {
connectedPromise.reject(new CodeError('SDP handshake aborted', 'ERR_SDP_HANDSHAKE_ABORTED'))
}

try {
resolveOnConnected(peerConnection, connectedPromise)

// reject the connectedPromise if the signal aborts
signal?.addEventListener('abort', sdpAbortedListener)

// we create the channel so that the RTCPeerConnection has a component for
// which to collect candidates. The label is not relevant to connection
// initiation but can be useful for debugging
Expand All @@ -102,7 +92,7 @@ export async function initiateConnection ({ peerConnection, signal, metrics, mul
})
}
peerConnection.onicecandidateerror = (event) => {
log('initiator ICE candidate error', event)
log.error('initiator ICE candidate error', event)
}

// create an offer
Expand Down Expand Up @@ -140,7 +130,7 @@ export async function initiateConnection ({ peerConnection, signal, metrics, mul

log.trace('initiator read candidates until connected')

await readCandidatesUntilConnected(connectedPromise, peerConnection, messageStream, {
await readCandidatesUntilConnected(peerConnection, messageStream, {
direction: 'initiator',
signal,
log
Expand All @@ -164,8 +154,6 @@ export async function initiateConnection ({ peerConnection, signal, metrics, mul
stream.abort(err)
throw err
} finally {
// remove event listeners
signal?.removeEventListener('abort', sdpAbortedListener)
peerConnection.onicecandidate = null
peerConnection.onicecandidateerror = null
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { pbStream } from 'it-protobuf-stream'
import pDefer, { type DeferredPromise } from 'p-defer'
import { type RTCPeerConnection, RTCSessionDescription } from '../webrtc/index.js'
import { Message } from './pb/message.js'
import { readCandidatesUntilConnected, resolveOnConnected } from './util.js'
import { readCandidatesUntilConnected } from './util.js'
import type { Logger } from '@libp2p/interface'
import type { IncomingStreamData } from '@libp2p/interface-internal'

Expand All @@ -20,41 +20,29 @@ export async function handleIncomingStream ({ peerConnection, stream, signal, co
const messageStream = pbStream(stream).pb(Message)

try {
const connectedPromise: DeferredPromise<void> = pDefer()
const answerSentPromise: DeferredPromise<void> = pDefer()

signal.onabort = () => {
connectedPromise.reject(new CodeError('Timed out while trying to connect', 'ERR_TIMEOUT'))
}

// candidate callbacks
peerConnection.onicecandidate = ({ candidate }) => {
answerSentPromise.promise.then(
async () => {
// a null candidate means end-of-candidates, an empty string candidate
// means end-of-candidates for this generation, otherwise this should
// be a valid candidate object
// see - https://www.w3.org/TR/webrtc/#rtcpeerconnectioniceevent
const data = JSON.stringify(candidate?.toJSON() ?? null)

log.trace('recipient sending ICE candidate %s', data)

await messageStream.write({
type: Message.Type.ICE_CANDIDATE,
data
}, {
signal
})
},
(err) => {
log.error('cannot set candidate since sending answer failed', err)
connectedPromise.reject(err)
}
)
// a null candidate means end-of-candidates, an empty string candidate
// means end-of-candidates for this generation, otherwise this should
// be a valid candidate object
// see - https://www.w3.org/TR/webrtc/#rtcpeerconnectioniceevent
const data = JSON.stringify(candidate?.toJSON() ?? null)

log.trace('recipient sending ICE candidate %s', data)

messageStream.write({
type: Message.Type.ICE_CANDIDATE,
data
}, {
signal
})
.catch(err => {
log.error('error sending ICE candidate', err)
})
}

resolveOnConnected(peerConnection, connectedPromise)

// read an SDP offer
const pbOffer = await messageStream.read({
signal
Expand Down Expand Up @@ -90,18 +78,20 @@ export async function handleIncomingStream ({ peerConnection, stream, signal, co
signal
})

await peerConnection.setLocalDescription(answer).catch(err => {
peerConnection.setLocalDescription(answer).then(() => {
answerSentPromise.resolve()
}, err => {
log.error('could not execute setLocalDescription', err)
answerSentPromise.reject(err)
throw new CodeError('Failed to set localDescription', 'ERR_SDP_HANDSHAKE_FAILED')
})

answerSentPromise.resolve()
await answerSentPromise.promise

log.trace('recipient read candidates until connected')

// wait until candidates are connected
await readCandidatesUntilConnected(connectedPromise, peerConnection, messageStream, {
await readCandidatesUntilConnected(peerConnection, messageStream, {
direction: 'recipient',
signal,
log
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { CodeError } from '@libp2p/interface'
import { CodeError, setMaxListeners } from '@libp2p/interface'
import { type CreateListenerOptions, type DialOptions, transportSymbol, type Transport, type Listener, type Upgrader, type ComponentLogger, type Logger, type Connection, type PeerId, type CounterGroup, type Metrics, type Startable } from '@libp2p/interface'
import { peerIdFromString } from '@libp2p/peer-id'
import { multiaddr, type Multiaddr } from '@multiformats/multiaddr'
Expand Down Expand Up @@ -56,6 +56,7 @@ export class WebRTCTransport implements Transport, Startable {
) {
this.log = components.logger.forComponent('libp2p:webrtc')
this.shutdownController = new AbortController()
setMaxListeners(Infinity, this.shutdownController.signal)

if (components.metrics != null) {
this.metrics = {
Expand Down
47 changes: 19 additions & 28 deletions packages/transport-webrtc/src/private-to-private/util.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { CodeError } from '@libp2p/interface'
import { closeSource } from '@libp2p/utils/close-source'
import { anySignal } from 'any-signal'
import pDefer from 'p-defer'
import { isFirefox } from '../util.js'
import { RTCIceCandidate } from '../webrtc/index.js'
import { Message } from './pb/message.js'
Expand All @@ -12,32 +11,19 @@ export interface ReadCandidatesOptions extends AbortOptions, LoggerOptions {
direction: string
}

export const readCandidatesUntilConnected = async (connectedPromise: DeferredPromise<void>, pc: RTCPeerConnection, stream: MessageStream<Message, Stream>, options: ReadCandidatesOptions): Promise<void> => {
// if we connect, stop trying to read from the stream
const controller = new AbortController()
connectedPromise.promise.then(() => {
controller.abort()
}, () => {
controller.abort()
})

const signal = anySignal([
controller.signal,
options.signal
])

const abortListener = (): void => {
closeSource(stream.unwrap().unwrap().source, options.log)
}

signal.addEventListener('abort', abortListener)

export const readCandidatesUntilConnected = async (pc: RTCPeerConnection, stream: MessageStream<Message, Stream>, options: ReadCandidatesOptions): Promise<void> => {
try {
const connectedPromise: DeferredPromise<void> = pDefer()
resolveOnConnected(pc, connectedPromise)

// read candidates until we are connected or we reach the end of the stream
while (true) {
// if we connect, stop trying to read from the stream
const message = await Promise.race([
connectedPromise.promise,
stream.read()
stream.read({
signal: options.signal
})
])

// stream ended or we became connected
Expand Down Expand Up @@ -72,15 +58,20 @@ export const readCandidatesUntilConnected = async (connectedPromise: DeferredPro
}
} catch (err) {
options.log.error('%s error parsing ICE candidate', options.direction, err)
} finally {
signal.removeEventListener('abort', abortListener)
signal.clear()

if (options.signal?.aborted === true) {
throw err
}
}
}

export function resolveOnConnected (pc: RTCPeerConnection, promise: DeferredPromise<void>): void {
function getConnectionState (pc: RTCPeerConnection): string {
return isFirefox ? pc.iceConnectionState : pc.connectionState
}

function resolveOnConnected (pc: RTCPeerConnection, promise: DeferredPromise<void>): void {
pc[isFirefox ? 'oniceconnectionstatechange' : 'onconnectionstatechange'] = (_) => {
switch (isFirefox ? pc.iceConnectionState : pc.connectionState) {
switch (getConnectionState(pc)) {
case 'connected':
promise.resolve()
break
Expand Down
34 changes: 34 additions & 0 deletions packages/transport-webrtc/test/peer.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { defaultLogger, logger } from '@libp2p/logger'
import { createEd25519PeerId } from '@libp2p/peer-id-factory'
import { multiaddr, type Multiaddr } from '@multiformats/multiaddr'
import { expect } from 'aegir/chai'
import delay from 'delay'
import { detect } from 'detect-browser'
import { duplexPair } from 'it-pair/duplex'
import { pbStream } from 'it-protobuf-stream'
Expand Down Expand Up @@ -117,6 +118,39 @@ describe('webrtc basic', () => {
initiator.peerConnection.close()
recipient.peerConnection.close()
})

it('should survive aborting during connection', async () => {
const abortController = new AbortController()
const { initiator, recipient } = await getComponents()

// no existing connection
initiator.connectionManager.getConnections.returns([])

// transport manager dials recipient
initiator.transportManager.dial.resolves(initiator.connection)

const createOffer = initiator.peerConnection.setRemoteDescription.bind(initiator.peerConnection)

initiator.peerConnection.setRemoteDescription = async (name) => {
// the dial is aborted
abortController.abort(new Error('Oh noes!'))
// setting the description takes some time
await delay(100)
return createOffer(name)
}

// signalling stream opens successfully
initiator.connection.newStream.withArgs(SIGNALING_PROTO_ID).resolves(initiator.stream)

await expect(Promise.all([
initiateConnection({
...initiator,
signal: abortController.signal
}),
handleIncomingStream(recipient)
]))
.to.eventually.be.rejected.with.property('message', 'Oh noes!')
})
})

describe('webrtc receiver', () => {
Expand Down

0 comments on commit 64a915a

Please sign in to comment.