diff --git a/packages/libp2p/src/circuit-relay/utils.ts b/packages/libp2p/src/circuit-relay/utils.ts index 810b0894a5..550c80a177 100644 --- a/packages/libp2p/src/circuit-relay/utils.ts +++ b/packages/libp2p/src/circuit-relay/utils.ts @@ -4,7 +4,6 @@ import { anySignal } from 'any-signal' import { CID } from 'multiformats/cid' import { sha256 } from 'multiformats/hashes/sha2' import { codes } from '../errors.js' -import { DEFAULT_DATA_LIMIT, DEFAULT_DURATION_LIMIT } from './constants.js' import type { Limit } from './pb/index.js' import type { Stream } from '@libp2p/interface/connection' import type { Source } from 'it-stream-types' @@ -13,6 +12,8 @@ import type { Uint8ArrayList } from 'uint8arraylist' const log = logger('libp2p:circuit-relay:utils') async function * countStreamBytes (source: Source, limit: { remaining: bigint }): AsyncGenerator { + const limitBytes = limit.remaining + for await (const buf of source) { const len = BigInt(buf.byteLength) @@ -29,7 +30,7 @@ async function * countStreamBytes (source: Source, log.error(err) } - throw new Error('data limit exceeded') + throw new CodeError(`data limit of ${limitBytes} bytes exceeded`, codes.ERR_TRANSFER_LIMIT_EXCEEDED) } limit.remaining -= len @@ -37,7 +38,7 @@ async function * countStreamBytes (source: Source, } } -const doRelay = (src: Stream, dst: Stream, abortSignal: AbortSignal, limit: Required): void => { +export function createLimitedRelay (src: Stream, dst: Stream, abortSignal: AbortSignal, limit?: Limit): void { function abortStreams (err: Error): void { src.abort(err) dst.abort(err) @@ -47,25 +48,33 @@ const doRelay = (src: Stream, dst: Stream, abortSignal: AbortSignal, limit: Requ const abortController = new AbortController() const signal = anySignal([abortSignal, abortController.signal]) - const timeout = setTimeout(() => { - abortController.abort() - }, limit.duration) + let timeout: ReturnType | undefined + + if (limit?.duration != null) { + timeout = setTimeout(() => { + abortController.abort() + }, limit.duration) + } let srcDstFinished = false let dstSrcFinished = false - const dataLimit = { - remaining: limit.data + let dataLimit: { remaining: bigint } | undefined + + if (limit?.data != null) { + dataLimit = { + remaining: limit.data + } } queueMicrotask(() => { const onAbort = (): void => { - dst.abort(new CodeError('duration limit exceeded', codes.ERR_TIMEOUT)) + dst.abort(new CodeError(`duration limit of ${limit?.duration} ms exceeded`, codes.ERR_TRANSFER_LIMIT_EXCEEDED)) } signal.addEventListener('abort', onAbort, { once: true }) - void dst.sink(countStreamBytes(src.source, dataLimit)) + void dst.sink(dataLimit == null ? src.source : countStreamBytes(src.source, dataLimit)) .catch(err => { log.error('error while relaying streams src -> dst', err) abortStreams(err) @@ -83,12 +92,12 @@ const doRelay = (src: Stream, dst: Stream, abortSignal: AbortSignal, limit: Requ queueMicrotask(() => { const onAbort = (): void => { - src.abort(new CodeError('duration limit exceeded', codes.ERR_TIMEOUT)) + src.abort(new CodeError(`duration limit of ${limit?.duration} ms exceeded`, codes.ERR_TRANSFER_LIMIT_EXCEEDED)) } signal.addEventListener('abort', onAbort, { once: true }) - void src.sink(countStreamBytes(dst.source, dataLimit)) + void src.sink(dataLimit == null ? dst.source : countStreamBytes(dst.source, dataLimit)) .catch(err => { log.error('error while relaying streams dst -> src', err) abortStreams(err) @@ -105,16 +114,6 @@ const doRelay = (src: Stream, dst: Stream, abortSignal: AbortSignal, limit: Requ }) } -export function createLimitedRelay (source: Stream, destination: Stream, abortSignal: AbortSignal, limit?: Limit): void { - const dataLimit = limit?.data ?? BigInt(DEFAULT_DATA_LIMIT) - const durationLimit = limit?.duration ?? DEFAULT_DURATION_LIMIT - - doRelay(source, destination, abortSignal, { - data: dataLimit, - duration: durationLimit - }) -} - /** * Convert a namespace string into a cid */ diff --git a/packages/libp2p/src/errors.ts b/packages/libp2p/src/errors.ts index 7fc428b6d4..a2d2ffb6c6 100644 --- a/packages/libp2p/src/errors.ts +++ b/packages/libp2p/src/errors.ts @@ -75,5 +75,5 @@ export enum codes { ERR_TOO_MANY_OUTBOUND_PROTOCOL_STREAMS = 'ERR_TOO_MANY_OUTBOUND_PROTOCOL_STREAMS', ERR_TOO_MANY_INBOUND_PROTOCOL_STREAMS = 'ERR_TOO_MANY_INBOUND_PROTOCOL_STREAMS', ERR_CONNECTION_DENIED = 'ERR_CONNECTION_DENIED', - ERR_TRANSFER_LIMIT_EXCEEDED = 'ERR_TRANSFER_LIMIT_EXCEEDED', + ERR_TRANSFER_LIMIT_EXCEEDED = 'ERR_TRANSFER_LIMIT_EXCEEDED' } diff --git a/packages/libp2p/test/circuit-relay/relay.node.ts b/packages/libp2p/test/circuit-relay/relay.node.ts index 4ddd793806..936dfbb869 100644 --- a/packages/libp2p/test/circuit-relay/relay.node.ts +++ b/packages/libp2p/test/circuit-relay/relay.node.ts @@ -8,22 +8,91 @@ import { Circuit } from '@multiformats/mafmt' import { multiaddr } from '@multiformats/multiaddr' import { expect } from 'aegir/chai' import delay from 'delay' +import all from 'it-all' import { pipe } from 'it-pipe' import { pbStream } from 'it-protobuf-stream' import defer from 'p-defer' import pWaitFor from 'p-wait-for' import sinon from 'sinon' import { Uint8ArrayList } from 'uint8arraylist' -import { RELAY_V2_HOP_CODEC } from '../../src/circuit-relay/constants.js' +import { DEFAULT_DATA_LIMIT, RELAY_V2_HOP_CODEC } from '../../src/circuit-relay/constants.js' import { circuitRelayServer, type CircuitRelayService, circuitRelayTransport } from '../../src/circuit-relay/index.js' import { HopMessage, Status } from '../../src/circuit-relay/pb/index.js' import { identifyService } from '../../src/identify/index.js' -import { createLibp2p } from '../../src/index.js' +import { createLibp2p, type Libp2pOptions } from '../../src/index.js' import { plaintext } from '../../src/insecure/index.js' import { discoveredRelayConfig, doesNotHaveRelay, getRelayAddress, hasRelay, usingAsRelay } from './utils.js' +import type { Components } from '../../src/components.js' import type { Libp2p } from '@libp2p/interface' import type { Connection } from '@libp2p/interface/connection' +async function createClient (options: Libp2pOptions = {}): Promise { + return createLibp2p({ + addresses: { + listen: ['/ip4/127.0.0.1/tcp/0'] + }, + transports: [ + tcp(), + circuitRelayTransport() + ], + streamMuxers: [ + yamux(), + mplex() + ], + connectionEncryption: [ + plaintext() + ], + connectionManager: { + minConnections: 0 + }, + services: { + identify: identifyService() + }, + ...options + }) +} + +async function createRelay (options: Libp2pOptions = {}): Promise> { + return createLibp2p({ + addresses: { + listen: ['/ip4/127.0.0.1/tcp/0'] + }, + transports: [ + tcp(), + circuitRelayTransport() + ], + streamMuxers: [ + yamux(), + mplex() + ], + connectionEncryption: [ + plaintext() + ], + ...options, + services: { + relay: circuitRelayServer(), + identify: identifyService(), + ...(options.services ?? {}) + } + }) +} + +const ECHO_PROTOCOL = '/test/echo/1.0.0' +const echoService = (components: Components): unknown => { + return { + async start () { + await components.registrar.handle(ECHO_PROTOCOL, ({ stream }) => { + void pipe( + stream, stream + ) + }, { + runOnTransientConnection: true + }) + }, + stop () {} + } +} + describe('circuit-relay', () => { describe('flows with 1 listener', () => { let local: Libp2p @@ -34,104 +103,37 @@ describe('circuit-relay', () => { beforeEach(async () => { // create 1 node and 3 relays [local, relay1, relay2, relay3] = await Promise.all([ - createLibp2p({ - connectionManager: { - minConnections: 0 - }, - addresses: { - listen: ['/ip4/127.0.0.1/tcp/0'] - }, + createClient({ transports: [ tcp(), circuitRelayTransport({ discoverRelays: 1 }) - ], - streamMuxers: [ - yamux(), - mplex() - ], - connectionEncryption: [ - plaintext() - ], - services: { - identify: identifyService() - } + ] }), - createLibp2p({ - connectionManager: { - minConnections: 0 - }, - addresses: { - listen: ['/ip4/127.0.0.1/tcp/0'] - }, + createRelay({ transports: [ tcp(), circuitRelayTransport({ discoverRelays: 1 }) - ], - streamMuxers: [ - yamux(), - mplex() - ], - connectionEncryption: [ - plaintext() - ], - services: { - relay: circuitRelayServer(), - identify: identifyService() - } + ] }), - createLibp2p({ - connectionManager: { - minConnections: 0 - }, - addresses: { - listen: ['/ip4/127.0.0.1/tcp/0'] - }, + createRelay({ transports: [ tcp(), circuitRelayTransport({ discoverRelays: 1 }) - ], - streamMuxers: [ - yamux(), - mplex() - ], - connectionEncryption: [ - plaintext() - ], - services: { - relay: circuitRelayServer(), - identify: identifyService() - } + ] }), - createLibp2p({ - connectionManager: { - minConnections: 0 - }, - addresses: { - listen: ['/ip4/127.0.0.1/tcp/0'] - }, + createRelay({ transports: [ tcp(), circuitRelayTransport({ discoverRelays: 1 }) - ], - streamMuxers: [ - yamux(), - mplex() - ], - connectionEncryption: [ - plaintext() - ], - services: { - relay: circuitRelayServer(), - identify: identifyService() - } + ] }) ]) }) @@ -348,113 +350,45 @@ describe('circuit-relay', () => { beforeEach(async () => { [local, remote, relay1, relay2, relay3] = await Promise.all([ - createLibp2p({ - addresses: { - listen: ['/ip4/127.0.0.1/tcp/0'] - }, + createClient({ transports: [ tcp(), circuitRelayTransport({ discoverRelays: 1 }) - ], - streamMuxers: [ - yamux(), - mplex() - ], - connectionEncryption: [ - plaintext() - ], - services: { - identify: identifyService() - } + ] }), - createLibp2p({ - addresses: { - listen: ['/ip4/127.0.0.1/tcp/0'] - }, + createClient({ transports: [ tcp(), circuitRelayTransport({ discoverRelays: 1 }) - ], - streamMuxers: [ - yamux(), - mplex() - ], - connectionEncryption: [ - plaintext() - ], - services: { - identify: identifyService() - } + ] }), - createLibp2p({ - addresses: { - listen: ['/ip4/127.0.0.1/tcp/0'] - }, + createRelay({ transports: [ tcp(), circuitRelayTransport({ discoverRelays: 1 }) - ], - streamMuxers: [ - yamux(), - mplex() - ], - connectionEncryption: [ - plaintext() - ], - services: { - relay: circuitRelayServer(), - identify: identifyService() - } + ] }), - createLibp2p({ - addresses: { - listen: ['/ip4/127.0.0.1/tcp/0'] - }, + createRelay({ transports: [ tcp(), circuitRelayTransport({ discoverRelays: 1 }) - ], - streamMuxers: [ - yamux(), - mplex() - ], - connectionEncryption: [ - plaintext() - ], - services: { - relay: circuitRelayServer(), - identify: identifyService() - } + ] }), - createLibp2p({ - addresses: { - listen: ['/ip4/127.0.0.1/tcp/0'] - }, + createRelay({ transports: [ tcp(), circuitRelayTransport({ discoverRelays: 1 }) - ], - streamMuxers: [ - yamux(), - mplex() - ], - connectionEncryption: [ - plaintext() - ], - services: { - relay: circuitRelayServer(), - identify: identifyService() - } + ] }) ]) }) @@ -764,72 +698,29 @@ describe('circuit-relay', () => { beforeEach(async () => { [local, remote, relay] = await Promise.all([ - createLibp2p({ - addresses: { - listen: ['/ip4/127.0.0.1/tcp/0'] - }, + createClient({ transports: [ tcp(), circuitRelayTransport({ discoverRelays: 1 }) - ], - streamMuxers: [ - yamux(), - mplex() - ], - connectionEncryption: [ - plaintext() - ], - services: { - identify: identifyService() - } + ] }), - createLibp2p({ - addresses: { - listen: ['/ip4/127.0.0.1/tcp/0'] - }, + createClient({ transports: [ tcp(), circuitRelayTransport({ discoverRelays: 1 }) - ], - streamMuxers: [ - yamux(), - mplex() - ], - connectionEncryption: [ - plaintext() - ], - services: { - identify: identifyService() - } + ] }), - createLibp2p({ - addresses: { - listen: ['/ip4/127.0.0.1/tcp/0'] - }, - transports: [ - tcp(), - circuitRelayTransport({ - discoverRelays: 1 - }) - ], - streamMuxers: [ - yamux(), - mplex() - ], - connectionEncryption: [ - plaintext() - ], + createRelay({ services: { relay: circuitRelayServer({ reservations: { defaultDataLimit: 1024n } - }), - identify: identifyService() + }) } }) ]) @@ -895,72 +786,29 @@ describe('circuit-relay', () => { beforeEach(async () => { [local, remote, relay] = await Promise.all([ - createLibp2p({ - addresses: { - listen: ['/ip4/127.0.0.1/tcp/0'] - }, + createClient({ transports: [ tcp(), circuitRelayTransport({ discoverRelays: 1 }) - ], - streamMuxers: [ - yamux(), - mplex() - ], - connectionEncryption: [ - plaintext() - ], - services: { - identify: identifyService() - } + ] }), - createLibp2p({ - addresses: { - listen: ['/ip4/127.0.0.1/tcp/0'] - }, + createClient({ transports: [ tcp(), circuitRelayTransport({ discoverRelays: 1 }) - ], - streamMuxers: [ - yamux(), - mplex() - ], - connectionEncryption: [ - plaintext() - ], - services: { - identify: identifyService() - } + ] }), - createLibp2p({ - addresses: { - listen: ['/ip4/127.0.0.1/tcp/0'] - }, - transports: [ - tcp(), - circuitRelayTransport({ - discoverRelays: 1 - }) - ], - streamMuxers: [ - yamux(), - mplex() - ], - connectionEncryption: [ - plaintext() - ], + createRelay({ services: { relay: circuitRelayServer({ reservations: { defaultDurationLimit: 1000 } - }), - identify: identifyService() + }) } }) ]) @@ -1028,66 +876,15 @@ describe('circuit-relay', () => { let relay: Libp2p<{ relay: CircuitRelayService }> beforeEach(async () => { - relay = await createLibp2p({ - addresses: { - listen: ['/ip4/127.0.0.1/tcp/0'] - }, - transports: [ - tcp(), - circuitRelayTransport() - ], - streamMuxers: [ - yamux(), - mplex() - ], - connectionEncryption: [ - plaintext() - ], - services: { - relay: circuitRelayServer(), - identify: identifyService() - } - }) + relay = await createRelay() ;[local, remote] = await Promise.all([ - createLibp2p({ - addresses: { - listen: ['/ip4/127.0.0.1/tcp/0'] - }, - transports: [ - tcp(), - circuitRelayTransport() - ], - streamMuxers: [ - yamux(), - mplex() - ], - connectionEncryption: [ - plaintext() - ], - services: { - identify: identifyService() - } - }), - createLibp2p({ + createClient(), + createClient({ addresses: { listen: [ `${relay.getMultiaddrs()[0].toString()}/p2p-circuit` ] - }, - transports: [ - tcp(), - circuitRelayTransport() - ], - streamMuxers: [ - yamux(), - mplex() - ], - connectionEncryption: [ - plaintext() - ], - services: { - identify: identifyService() } }) ]) @@ -1115,66 +912,64 @@ describe('circuit-relay', () => { let relay: Libp2p<{ relay: CircuitRelayService }> beforeEach(async () => { - relay = await createLibp2p({ - addresses: { - listen: ['/ip4/127.0.0.1/tcp/0'] - }, - transports: [ - tcp(), - circuitRelayTransport() - ], - streamMuxers: [ - yamux(), - mplex() - ], - connectionEncryption: [ - plaintext() - ], + relay = await createRelay() + + ;[local, remote] = await Promise.all([ + createClient(), + createClient({ + addresses: { + listen: [ + `${relay.getMultiaddrs()[0].toString().split('/p2p')[0]}/p2p-circuit` + ] + } + }) + ]) + }) + + afterEach(async () => { + // Stop each node + await Promise.all([local, remote, relay].map(async libp2p => { + if (libp2p != null) { + await libp2p.stop() + } + })) + }) + + it('should be able to dial remote on preconfigured relay address', async () => { + const ma = getRelayAddress(remote) + + await expect(local.dial(ma)).to.eventually.be.ok() + }) + }) + + describe('unlimited relay', () => { + let local: Libp2p + let remote: Libp2p + let relay: Libp2p<{ relay: CircuitRelayService }> + const defaultDurationLimit = 100 + + beforeEach(async () => { + relay = await createRelay({ services: { - relay: circuitRelayServer(), - identify: identifyService() + relay: circuitRelayServer({ + reservations: { + defaultDurationLimit, + applyDefaultLimit: false + } + }) } }) ;[local, remote] = await Promise.all([ - createLibp2p({ - addresses: { - listen: ['/ip4/127.0.0.1/tcp/0'] - }, - transports: [ - tcp(), - circuitRelayTransport() - ], - streamMuxers: [ - yamux(), - mplex() - ], - connectionEncryption: [ - plaintext() - ], - services: { - identify: identifyService() - } - }), - createLibp2p({ + createClient(), + createClient({ addresses: { listen: [ `${relay.getMultiaddrs()[0].toString().split('/p2p')[0]}/p2p-circuit` ] }, - transports: [ - tcp(), - circuitRelayTransport() - ], - streamMuxers: [ - yamux(), - mplex() - ], - connectionEncryption: [ - plaintext() - ], services: { - identify: identifyService() + echoService } }) ]) @@ -1189,10 +984,59 @@ describe('circuit-relay', () => { })) }) - it('should be able to dial remote on preconfigured relay address', async () => { + it('should not apply a data limit', async () => { const ma = getRelayAddress(remote) - await expect(local.dial(ma)).to.eventually.be.ok() + const stream = await local.dialProtocol(ma, ECHO_PROTOCOL, { + runOnTransientConnection: true + }) + + // write more than the default data limit + const data = new Uint8Array(Number(DEFAULT_DATA_LIMIT * 2n)) + + const result = await pipe( + [data], + stream, + async (source) => new Uint8ArrayList(...(await all(source))) + ) + + expect(result.subarray()).to.equalBytes(data) + }) + + it('should not apply a time limit', async () => { + const ma = getRelayAddress(remote) + + const stream = await local.dialProtocol(ma, ECHO_PROTOCOL, { + runOnTransientConnection: true + }) + + let finished = false + + setTimeout(() => { + finished = true + }, defaultDurationLimit * 5) + + const start = Date.now() + let finish = 0 + + await pipe( + async function * () { + while (true) { + yield new Uint8Array() + await delay(10) + + if (finished) { + finish = Date.now() + break + } + } + }, + stream + ) + + // default time limit is set to 100ms so the stream should have been open + // for longer than that + expect(finish - start).to.be.greaterThan(defaultDurationLimit) }) }) })