Skip to content

Commit

Permalink
feat!: close streams gracefully
Browse files Browse the repository at this point in the history
- Updates all libp2p related deps
- Refactors `YamuxStream` class to extend `AbstractStream` similar to other stream muxers
- Stream close methods are now async

BREAKING CHANGE: stream close methods are now asyc, requires libp2p@0.46.x or later
  • Loading branch information
achingbrain committed Aug 1, 2023
1 parent dc55dcf commit 61ba47b
Show file tree
Hide file tree
Showing 13 changed files with 270 additions and 366 deletions.
19 changes: 8 additions & 11 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -171,24 +171,21 @@
"docs": "aegir docs"
},
"dependencies": {
"@libp2p/interface-connection": "^5.1.0",
"@libp2p/interface-stream-muxer": "^4.1.2",
"@libp2p/interfaces": "^3.3.2",
"@libp2p/logger": "^2.0.7",
"@libp2p/interface": "^0.1.0",
"@libp2p/logger": "^3.0.0",
"abortable-iterator": "^5.0.1",
"any-signal": "^4.1.1",
"it-foreach": "^2.0.3",
"it-pipe": "^3.0.1",
"it-pushable": "^3.1.3",
"it-pushable": "^3.2.0",
"uint8arraylist": "^2.4.3"
},
"devDependencies": {
"@dapplion/benchmark": "^0.2.4",
"@libp2p/interface-stream-muxer-compliance-tests": "^7.0.3",
"@libp2p/mplex": "^8.0.3",
"aegir": "^39.0.7",
"@libp2p/interface-compliance-tests": "^4.0.0",
"@libp2p/mplex": "^9.0.0",
"aegir": "^40.0.1",
"it-drain": "^3.0.2",
"it-pair": "^2.0.6",
"it-stream-types": "^2.0.1"
},
"browser": {}
}
}
2 changes: 1 addition & 1 deletion src/config.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { CodeError } from '@libp2p/interfaces/errors'
import { CodeError } from '@libp2p/interface/errors'
import { logger, type Logger } from '@libp2p/logger'
import { ERR_INVALID_CONFIG, INITIAL_STREAM_WINDOW, MAX_STREAM_WINDOW } from './constants.js'

Expand Down
2 changes: 1 addition & 1 deletion src/decode.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { CodeError } from '@libp2p/interfaces/errors'
import { CodeError } from '@libp2p/interface/errors'
import { Uint8ArrayList } from 'uint8arraylist'
import { ERR_DECODE_INVALID_VERSION, ERR_DECODE_IN_PROGRESS } from './constants.js'
import { type FrameHeader, FrameType, HEADER_LENGTH, YAMUX_VERSION } from './frame.js'
Expand Down
2 changes: 1 addition & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Yamux } from './muxer.js'
import type { YamuxMuxerInit } from './muxer.js'
import type { StreamMuxerFactory } from '@libp2p/interface-stream-muxer'
import type { StreamMuxerFactory } from '@libp2p/interface/stream-muxer'
export { GoAwayCode } from './frame.js'

export function yamux (init: YamuxMuxerInit = {}): () => StreamMuxerFactory {
Expand Down
100 changes: 57 additions & 43 deletions src/muxer.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { CodeError } from '@libp2p/interfaces/errors'
import { CodeError } from '@libp2p/interface/errors'
import { logger, type Logger } from '@libp2p/logger'
import { abortableSource } from 'abortable-iterator'
import { anySignal, type ClearableSignal } from 'any-signal'
import { pipe } from 'it-pipe'
import { pushable, type Pushable } from 'it-pushable'
import { type Config, defaultConfig, verifyConfig } from './config.js'
Expand All @@ -9,13 +9,14 @@ import { Decoder } from './decode.js'
import { encodeHeader } from './encode.js'
import { Flag, type FrameHeader, FrameType, GoAwayCode, stringifyHeader } from './frame.js'
import { StreamState, YamuxStream } from './stream.js'
import type { Stream } from '@libp2p/interface-connection'
import type { StreamMuxer, StreamMuxerFactory, StreamMuxerInit } from '@libp2p/interface-stream-muxer'
import type { Logger } from '@libp2p/logger'
import type { AbortOptions } from '@libp2p/interface'
import type { Stream } from '@libp2p/interface/connection'
import type { StreamMuxer, StreamMuxerFactory, StreamMuxerInit } from '@libp2p/interface/stream-muxer'
import type { Sink, Source } from 'it-stream-types'
import type { Uint8ArrayList } from 'uint8arraylist'

const YAMUX_PROTOCOL_ID = '/yamux/1.0.0'
const CLOSE_TIMEOUT = 500

export interface YamuxMuxerInit extends StreamMuxerInit, Partial<Config> {
}
Expand All @@ -36,12 +37,15 @@ export class Yamux implements StreamMuxerFactory {
}
}

export interface CloseOptions extends AbortOptions {
reason?: GoAwayCode
}

export class YamuxMuxer implements StreamMuxer {
protocol = YAMUX_PROTOCOL_ID
source: Pushable<Uint8Array>
sink: Sink<Source<Uint8ArrayList | Uint8Array>, Promise<void>>

private readonly _init: YamuxMuxerInit
private readonly config: Config
private readonly log?: Logger

Expand Down Expand Up @@ -75,7 +79,6 @@ export class YamuxMuxer implements StreamMuxer {
private readonly onStreamEnd?: (stream: Stream) => void

constructor (init: YamuxMuxerInit) {
this._init = init
this.client = init.direction === 'outbound'
this.config = { ...defaultConfig, ...init }
this.log = this.config.log
Expand All @@ -89,22 +92,19 @@ export class YamuxMuxer implements StreamMuxer {
this._streams = new Map()

this.source = pushable({
onEnd: (err?: Error): void => {
onEnd: (): void => {
this.log?.trace('muxer source ended')
this.close(err)

this._streams.forEach(stream => {
stream.destroy()
})
}
})

this.sink = async (source: Source<Uint8ArrayList | Uint8Array>): Promise<void> => {
let signal: ClearableSignal | undefined

if (this._init.signal != null) {
signal = anySignal([this.closeController.signal, this._init.signal])
}

source = abortableSource(
source,
signal ?? this.closeController.signal,
this.closeController.signal,
{ returnOnAbort: true }
)

Expand Down Expand Up @@ -133,15 +133,15 @@ export class YamuxMuxer implements StreamMuxer {
}

error = err as Error
} finally {
if (signal != null) {
signal.clear()
}
}

this.log?.trace('muxer sink ended')

this.close(error, reason)
if (error != null) {
this.abort(error, reason)
} else {
await this.close({ reason })
}
}

this.numInboundStreams = 0
Expand Down Expand Up @@ -261,34 +261,48 @@ export class YamuxMuxer implements StreamMuxer {

/**
* Close the muxer
*
* @param err
* @param reason - The GoAway reason to be sent
*/
close (err?: Error, reason?: GoAwayCode): void {
async close (options: CloseOptions = {}): Promise<void> {
if (this.closeController.signal.aborted) {
// already closed
return
}

// If reason was provided, use that, otherwise use the presence of `err` to determine the reason
reason = reason ?? (err === undefined ? GoAwayCode.InternalError : GoAwayCode.NormalTermination)
const reason = options?.reason ?? GoAwayCode.NormalTermination

if (err != null) {
this.log?.error('muxer close reason=%s error=%s', GoAwayCode[reason], err)
} else {
this.log?.trace('muxer close reason=%s', GoAwayCode[reason])
this.log?.trace('muxer close reason=%s', reason)

options.signal = options.signal ?? AbortSignal.timeout(CLOSE_TIMEOUT)

try {
// If err is provided, abort all underlying streams, else close all underlying streams
await Promise.all(
[...this._streams.values()].map(async s => s.close(options))
)

// send reason to the other side, allow the other side to close gracefully
this.sendGoAway(reason)

this._closeMuxer()
} catch (err: any) {
this.abort(err)
}
}

// If err is provided, abort all underlying streams, else close all underlying streams
if (err === undefined) {
for (const stream of this._streams.values()) {
stream.close()
}
} else {
for (const stream of this._streams.values()) {
stream.abort(err)
}
abort (err: Error, reason?: GoAwayCode): void {
if (this.closeController.signal.aborted) {
// already closed
return
}

reason = reason ?? GoAwayCode.InternalError

// If reason was provided, use that, otherwise use the presence of `err` to determine the reason
this.log?.error('muxer abort reason=%s error=%s', reason, err)

// Abort all underlying streams
for (const stream of this._streams.values()) {
stream.abort(err)
}

// send reason to the other side, allow the other side to close gracefully
Expand Down Expand Up @@ -319,16 +333,16 @@ export class YamuxMuxer implements StreamMuxer {
}

const stream = new YamuxStream({
id,
id: id.toString(),
name,
state,
direction,
sendFrame: this.sendFrame.bind(this),
onStreamEnd: () => {
onEnd: () => {
this.closeStream(id)
this.onStreamEnd?.(stream)
},
log: this.log,
log: logger(`libp2p:yamux:${direction}:${id}`),
config: this.config,
getRTT: this.getRTT.bind(this)
})
Expand Down
Loading

0 comments on commit 61ba47b

Please sign in to comment.