Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

Commit

Permalink
feat: close read and write streams (#170)
Browse files Browse the repository at this point in the history
This also now throws an error when a write is attempted on a non existent stream. Previously we would just send the message, but this is against the mplex protocol.

Refs: #120
Supersedes: #115
  • Loading branch information
achingbrain committed May 23, 2022
1 parent a50edeb commit 3917968
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 12 deletions.
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,8 @@
"varint": "^6.0.0"
},
"devDependencies": {
"@libp2p/interface-compliance-tests": "^1.1.32",
"@libp2p/interfaces": "^1.3.31",
"@libp2p/interface-compliance-tests": "^2.0.1",
"@libp2p/interfaces": "^2.0.1",
"@types/varint": "^6.0.0",
"aegir": "^37.0.10",
"cborg": "^1.8.1",
Expand Down
22 changes: 19 additions & 3 deletions src/mplex.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { createStream } from './stream.js'
import { toString as uint8ArrayToString } from 'uint8arrays'
import { trackedMap } from '@libp2p/tracked-map'
import { logger } from '@libp2p/logger'
import errCode from 'err-code'
import type { Components } from '@libp2p/interfaces/components'
import type { Sink } from 'it-stream-types'
import type { StreamMuxer, StreamMuxerInit } from '@libp2p/interfaces/stream-muxer'
Expand Down Expand Up @@ -130,6 +131,10 @@ export class MplexStreamMuxer implements StreamMuxer {
}

const send = (msg: Message) => {
if (!registry.has(id)) {
throw errCode(new Error('the stream is not in the muxer registry, it may have already been closed'), 'ERR_STREAM_DOESNT_EXIST')
}

if (log.enabled) {
log.trace('%s stream %s send', type, id, printMessage(msg))
}
Expand Down Expand Up @@ -196,10 +201,18 @@ export class MplexStreamMuxer implements StreamMuxer {
const { initiators, receivers } = this._streams
// Abort all the things!
for (const s of initiators.values()) {
s.abort(err)
if (err != null) {
s.abort(err)
} else {
s.close()
}
}
for (const s of receivers.values()) {
s.abort(err)
if (err != null) {
s.abort(err)
} else {
s.close()
}
}
}
const source = pushableV<Message>({ onEnd })
Expand Down Expand Up @@ -241,14 +254,17 @@ export class MplexStreamMuxer implements StreamMuxer {
switch (type) {
case MessageTypes.MESSAGE_INITIATOR:
case MessageTypes.MESSAGE_RECEIVER:
// We got data from the remote, push it into our local stream
stream.source.push(message.data.slice())
break
case MessageTypes.CLOSE_INITIATOR:
case MessageTypes.CLOSE_RECEIVER:
stream.close()
// We should expect no more data from the remote, stop reading
stream.closeRead()
break
case MessageTypes.RESET_INITIATOR:
case MessageTypes.RESET_RECEIVER:
// Stop reading and writing to the stream immediately
stream.reset()
break
default:
Expand Down
60 changes: 55 additions & 5 deletions src/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ const log = logger('libp2p:mplex:stream')

const ERR_MPLEX_STREAM_RESET = 'ERR_MPLEX_STREAM_RESET'
const ERR_MPLEX_STREAM_ABORT = 'ERR_MPLEX_STREAM_ABORT'
const ERR_MPLEX_SINK_ENDED = 'ERR_MPLEX_SINK_ENDED'

export interface Options {
id: number
Expand All @@ -31,6 +32,7 @@ export function createStream (options: Options): MplexStream {

const abortController = new AbortController()
const resetController = new AbortController()
const closeController = new AbortController()
const Types = type === 'initiator' ? InitiatorMessageTypes : ReceiverMessageTypes
const externalId = type === 'initiator' ? (`i${id}`) : `r${id}`
const streamName = `${name == null ? id : name}`
Expand All @@ -49,7 +51,7 @@ export function createStream (options: Options): MplexStream {
}

sourceEnded = true
log.trace('%s stream %s source end', type, streamName, err)
log.trace('%s stream %s source end - err: %o', type, streamName, err)

if (err != null && endErr == null) {
endErr = err
Expand Down Expand Up @@ -85,30 +87,71 @@ export function createStream (options: Options): MplexStream {
}
}

const stream = {
// Close for reading
const stream: MplexStream = {
// Close for both Reading and Writing
close: () => {
log.trace('%s stream %s close', type, streamName)

stream.closeRead()
stream.closeWrite()
},

// Close for reading
closeRead: () => {
log.trace('%s stream %s closeRead', type, streamName)

if (sourceEnded) {
return
}

stream.source.end()
},

// Close for writing
closeWrite: () => {
log.trace('%s stream %s closeWrite', type, streamName)

if (sinkEnded) {
return
}

closeController.abort()

try {
send({ id, type: Types.CLOSE })
} catch (err) {
log.trace('%s stream %s error sending close', type, name, err)
}

onSinkEnd()
},

// Close for reading and writing (local error)
abort: (err?: Error) => {
abort: (err: Error) => {
log.trace('%s stream %s abort', type, streamName, err)
// End the source with the passed error
stream.source.end(err)
abortController.abort()
onSinkEnd(err)
},

// Close immediately for reading and writing (remote error)
reset: () => {
const err = errCode(new Error('stream reset'), ERR_MPLEX_STREAM_RESET)
resetController.abort()
stream.source.end(err)
onSinkEnd(err)
},

sink: async (source: Source<Uint8Array>) => {
if (sinkEnded) {
throw errCode(new Error('stream closed for writing'), ERR_MPLEX_SINK_ENDED)
}

source = abortableSource(source, anySignal([
abortController.signal,
resetController.signal
resetController.signal,
closeController.signal
]))

try {
Expand All @@ -135,6 +178,10 @@ export function createStream (options: Options): MplexStream {
}
} catch (err: any) {
if (err.type === 'aborted' && err.message === 'The operation was aborted') {
if (closeController.signal.aborted) {
return
}

if (resetController.signal.aborted) {
err.message = 'stream reset'
err.code = ERR_MPLEX_STREAM_RESET
Expand Down Expand Up @@ -171,10 +218,13 @@ export function createStream (options: Options): MplexStream {

onSinkEnd()
},

source: pushable<Uint8Array>({
onEnd: onSourceEnd
}),

timeline,

id: externalId
}

Expand Down
4 changes: 2 additions & 2 deletions test/stream.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ async function streamPair (n: number, onInitiatorMessage?: onMessage, onReceiver

// when the initiator sends a CLOSE message, we call close
if (msg.type === MessageTypes.CLOSE_INITIATOR) {
receiver.close()
void receiver.closeRead()
}

// when the initiator sends a RESET message, we call close
Expand All @@ -114,7 +114,7 @@ async function streamPair (n: number, onInitiatorMessage?: onMessage, onReceiver

// when the receiver sends a CLOSE message, we call close
if (msg.type === MessageTypes.CLOSE_RECEIVER) {
initiator.close()
void initiator.close()
}

// when the receiver sends a RESET message, we call close
Expand Down

0 comments on commit 3917968

Please sign in to comment.