Skip to content

Commit

Permalink
fix(circuit-relay): respect applyDefaultLimit when it is false (#2139)
Browse files Browse the repository at this point in the history
We were accidentally applying the default limits if no limits were
set, which is intentionally the case when the `applyDefaultLimit`
config option is set to `false`.
  • Loading branch information
achingbrain committed Oct 9, 2023
1 parent 70dbb97 commit df2153e
Show file tree
Hide file tree
Showing 3 changed files with 224 additions and 381 deletions.
43 changes: 21 additions & 22 deletions packages/libp2p/src/circuit-relay/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import { anySignal } from 'any-signal'
import { CID } from 'multiformats/cid'
import { sha256 } from 'multiformats/hashes/sha2'
import { codes } from '../errors.js'
import { DEFAULT_DATA_LIMIT, DEFAULT_DURATION_LIMIT } from './constants.js'
import type { Limit } from './pb/index.js'
import type { Stream } from '@libp2p/interface/connection'
import type { Source } from 'it-stream-types'
Expand All @@ -13,6 +12,8 @@ import type { Uint8ArrayList } from 'uint8arraylist'
const log = logger('libp2p:circuit-relay:utils')

async function * countStreamBytes (source: Source<Uint8Array | Uint8ArrayList>, limit: { remaining: bigint }): AsyncGenerator<Uint8Array | Uint8ArrayList, void, unknown> {
const limitBytes = limit.remaining

for await (const buf of source) {
const len = BigInt(buf.byteLength)

Expand All @@ -29,15 +30,15 @@ async function * countStreamBytes (source: Source<Uint8Array | Uint8ArrayList>,
log.error(err)
}

throw new Error('data limit exceeded')
throw new CodeError(`data limit of ${limitBytes} bytes exceeded`, codes.ERR_TRANSFER_LIMIT_EXCEEDED)
}

limit.remaining -= len
yield buf
}
}

const doRelay = (src: Stream, dst: Stream, abortSignal: AbortSignal, limit: Required<Limit>): void => {
export function createLimitedRelay (src: Stream, dst: Stream, abortSignal: AbortSignal, limit?: Limit): void {
function abortStreams (err: Error): void {
src.abort(err)
dst.abort(err)
Expand All @@ -47,25 +48,33 @@ const doRelay = (src: Stream, dst: Stream, abortSignal: AbortSignal, limit: Requ
const abortController = new AbortController()
const signal = anySignal([abortSignal, abortController.signal])

const timeout = setTimeout(() => {
abortController.abort()
}, limit.duration)
let timeout: ReturnType<typeof setTimeout> | undefined

if (limit?.duration != null) {
timeout = setTimeout(() => {
abortController.abort()
}, limit.duration)
}

let srcDstFinished = false
let dstSrcFinished = false

const dataLimit = {
remaining: limit.data
let dataLimit: { remaining: bigint } | undefined

if (limit?.data != null) {
dataLimit = {
remaining: limit.data
}
}

queueMicrotask(() => {
const onAbort = (): void => {
dst.abort(new CodeError('duration limit exceeded', codes.ERR_TIMEOUT))
dst.abort(new CodeError(`duration limit of ${limit?.duration} ms exceeded`, codes.ERR_TRANSFER_LIMIT_EXCEEDED))
}

signal.addEventListener('abort', onAbort, { once: true })

void dst.sink(countStreamBytes(src.source, dataLimit))
void dst.sink(dataLimit == null ? src.source : countStreamBytes(src.source, dataLimit))
.catch(err => {
log.error('error while relaying streams src -> dst', err)
abortStreams(err)
Expand All @@ -83,12 +92,12 @@ const doRelay = (src: Stream, dst: Stream, abortSignal: AbortSignal, limit: Requ

queueMicrotask(() => {
const onAbort = (): void => {
src.abort(new CodeError('duration limit exceeded', codes.ERR_TIMEOUT))
src.abort(new CodeError(`duration limit of ${limit?.duration} ms exceeded`, codes.ERR_TRANSFER_LIMIT_EXCEEDED))
}

signal.addEventListener('abort', onAbort, { once: true })

void src.sink(countStreamBytes(dst.source, dataLimit))
void src.sink(dataLimit == null ? dst.source : countStreamBytes(dst.source, dataLimit))
.catch(err => {
log.error('error while relaying streams dst -> src', err)
abortStreams(err)
Expand All @@ -105,16 +114,6 @@ const doRelay = (src: Stream, dst: Stream, abortSignal: AbortSignal, limit: Requ
})
}

export function createLimitedRelay (source: Stream, destination: Stream, abortSignal: AbortSignal, limit?: Limit): void {
const dataLimit = limit?.data ?? BigInt(DEFAULT_DATA_LIMIT)
const durationLimit = limit?.duration ?? DEFAULT_DURATION_LIMIT

doRelay(source, destination, abortSignal, {
data: dataLimit,
duration: durationLimit
})
}

/**
* Convert a namespace string into a cid
*/
Expand Down
2 changes: 1 addition & 1 deletion packages/libp2p/src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,5 +75,5 @@ export enum codes {
ERR_TOO_MANY_OUTBOUND_PROTOCOL_STREAMS = 'ERR_TOO_MANY_OUTBOUND_PROTOCOL_STREAMS',
ERR_TOO_MANY_INBOUND_PROTOCOL_STREAMS = 'ERR_TOO_MANY_INBOUND_PROTOCOL_STREAMS',
ERR_CONNECTION_DENIED = 'ERR_CONNECTION_DENIED',
ERR_TRANSFER_LIMIT_EXCEEDED = 'ERR_TRANSFER_LIMIT_EXCEEDED',
ERR_TRANSFER_LIMIT_EXCEEDED = 'ERR_TRANSFER_LIMIT_EXCEEDED'
}
Loading

0 comments on commit df2153e

Please sign in to comment.