diff --git a/src/circuit/client.ts b/src/circuit/client.ts index 5fd5c41b4a..3646720f3a 100644 --- a/src/circuit/client.ts +++ b/src/circuit/client.ts @@ -1,5 +1,5 @@ import { logger } from '@libp2p/logger' -import { relayV2HopCodec } from './multicodec.js' +import { RELAY_V2_HOP_CODEC } from './multicodec.js' import { getExpiration, namespaceToCid } from './utils.js' import { CIRCUIT_PROTO_CODE, @@ -99,7 +99,7 @@ export class CircuitService extends EventEmitter implement } // Check if it has the protocol - const hasProtocol = protocols.find(protocol => protocol === relayV2HopCodec) + const hasProtocol = protocols.find(protocol => protocol === RELAY_V2_HOP_CODEC) log.trace(`Peer ${peerId.toString()} protocol change`, this.components.getPeerId().toString()) // If no protocol, check if we were keeping the peer before as a listenRelay @@ -252,7 +252,7 @@ export class CircuitService extends EventEmitter implement continue } - const hasProtocol = protocols.find(protocol => protocol === relayV2HopCodec) + const hasProtocol = protocols.find(protocol => protocol === RELAY_V2_HOP_CODEC) // Continue to next if it does not support Hop if (hasProtocol == null) { diff --git a/src/circuit/multicodec.ts b/src/circuit/multicodec.ts index 6287719be3..3be1e364c4 100644 --- a/src/circuit/multicodec.ts +++ b/src/circuit/multicodec.ts @@ -1,4 +1,4 @@ -export const relayV1Codec = '/libp2p/circuit/relay/0.1.0' -export const relayV2HopCodec = '/libp2p/circuit/relay/0.2.0/hop' -export const relayV2StopCodec = '/libp2p/circuit/relay/0.2.0/stop' +export const RELAY_V1_CODEC = '/libp2p/circuit/relay/0.1.0' +export const RELAY_V2_HOP_CODEC = '/libp2p/circuit/relay/0.2.0/hop' +export const RELAY_V2_STOP_CODEC = '/libp2p/circuit/relay/0.2.0/stop' diff --git a/src/circuit/transport.ts b/src/circuit/transport.ts index cf545f3bd2..49e9e1cf79 100644 --- a/src/circuit/transport.ts +++ b/src/circuit/transport.ts @@ -6,7 +6,7 @@ import type { Multiaddr } from '@multiformats/multiaddr' import { multiaddr } from '@multiformats/multiaddr' import { codes } from '../errors.js' import { streamToMaConnection } from '@libp2p/utils/stream-to-ma-conn' -import { relayV2HopCodec, relayV1Codec, relayV2StopCodec } from './multicodec.js' +import { RELAY_V2_HOP_CODEC, RELAY_V1_CODEC, RELAY_V2_STOP_CODEC } from './multicodec.js' import { createListener } from './listener.js' import { peerIdFromString } from '@libp2p/peer-id' import type { IncomingStreamData } from '@libp2p/interface-registrar' @@ -17,7 +17,14 @@ import { StreamHandlerV2 } from './v2/stream-handler.js' import { StreamHandlerV1 } from './v1/stream-handler.js' import * as CircuitV1Handler from './v1/index.js' import * as CircuitV2Handler from './v2/index.js' +<<<<<<< HEAD import type { ReservationStore } from './v2/reservation-store.js' +======= +import { TimeoutController } from 'timeout-abort-controller' +import type { RelayConfig } from '../index.js' +import { setMaxListeners } from 'events' +import { abortableDuplex } from 'abortable-iterator' +>>>>>>> 5268cb3f (fix: bugs, circuitv1 code) const log = logger('libp2p:circuit') @@ -34,6 +41,7 @@ interface ConnectOptions { ma: Multiaddr disconnectOnFailure: boolean } +<<<<<<< HEAD export class Circuit implements Transport, Startable { private handler?: ConnectionHandler @@ -47,6 +55,17 @@ export class Circuit implements Transport, Startable { this.components = components this._started = false this.reservationStore = new ReservationStore() +======= +export class Circuit implements Transport, Initializable { + private handler?: ConnectionHandler + private components: Components = new Components() + private readonly reservationStore: ReservationStore + private readonly _init: RelayConfig + + constructor (options: RelayConfig) { + this._init = options + this.reservationStore = new ReservationStore(options.limit) +>>>>>>> 5268cb3f (fix: bugs, circuitv1 code) } isStarted() { @@ -60,7 +79,7 @@ export class Circuit implements Transport, Startable { this._started = true - void this.components.getRegistrar().handle(relayV1Codec, (data) => { + void this.components.getRegistrar().handle(RELAY_V1_CODEC, (data) => { void this._onProtocolV1(data).catch(err => { log.error(err) }) @@ -68,7 +87,7 @@ export class Circuit implements Transport, Startable { .catch(err => { log.error(err) }) - void this.components.getRegistrar().handle(relayV2HopCodec, (data) => { + void this.components.getRegistrar().handle(RELAY_V2_HOP_CODEC, (data) => { void this._onV2ProtocolHop(data).catch(err => { log.error(err) }) @@ -76,7 +95,7 @@ export class Circuit implements Transport, Startable { .catch(err => { log.error(err) }) - void this.components.getRegistrar().handle(relayV2StopCodec, (data) => { + void this.components.getRegistrar().handle(RELAY_V2_STOP_CODEC, (data) => { void this._onV2ProtocolStop(data).catch(err => { log.error(err) }) @@ -112,6 +131,7 @@ export class Circuit implements Transport, Startable { async _onProtocolV1(data: IncomingStreamData) { const { connection, stream } = data +<<<<<<< HEAD const streamHandler = new StreamHandlerV1({ stream }) try { const request = await streamHandler.read() @@ -139,6 +159,43 @@ export class Circuit implements Transport, Startable { CircuitV1Handler.handleCircuitV1Error(streamHandler, CircuitV1.CircuitRelay.Status.MALFORMED_MESSAGE) } } +======= + const controller = new TimeoutController(this._init.hop.timeout) + + try { + // fails on node < 15.4 + setMaxListeners?.(Infinity, controller.signal) + } catch {} + + try { + const source = abortableDuplex(stream, controller.signal) + const streamHandler = new StreamHandlerV1({ stream: { ...stream, ...source } }) + const request = await streamHandler.read() + + if (request == null) { + log('request was invalid, could not read from stream') + CircuitV1Handler.handleCircuitV1Error(streamHandler, CircuitV1.CircuitRelay.Status.MALFORMED_MESSAGE) + return + } + + switch (request.type) { + case CircuitV1.CircuitRelay.Type.CAN_HOP: + case CircuitV1.CircuitRelay.Type.HOP: { + log('received circuit v1 hop request from %p', connection.remotePeer) + CircuitV1Handler.handleCircuitV1Error(streamHandler, CircuitV1.CircuitRelay.Status.HOP_CANT_SPEAK_RELAY) + break + } + case CircuitV1.CircuitRelay.Type.STOP: { + log('received circuit v1 stop request from %p', connection.remotePeer) + CircuitV1Handler.handleCircuitV1Error(streamHandler, CircuitV1.CircuitRelay.Status.STOP_RELAY_REFUSED) + break + } + default: { + log('Request of type %s not supported', request.type) + CircuitV1Handler.handleCircuitV1Error(streamHandler, CircuitV1.CircuitRelay.Status.MALFORMED_MESSAGE) + } + } +>>>>>>> 5268cb3f (fix: bugs, circuitv1 code) } finally { controller.clear() } @@ -146,22 +203,40 @@ export class Circuit implements Transport, Startable { async _onV2ProtocolHop({ connection, stream }: IncomingStreamData) { log('received circuit v2 hop protocol stream from %s', connection.remotePeer) - const streamHandler = new StreamHandlerV2({ stream }) - const request = CircuitV2.HopMessage.decode(await streamHandler.read()) + const controller = new TimeoutController(this._init.hop.timeout) - if (request?.type === undefined) { - return - } + try { + // fails on node < 15.4 + setMaxListeners?.(Infinity, controller.signal) + } catch {} - await CircuitV2Handler.handleHopProtocol({ - connection, - streamHandler, - circuit: this, - relayPeer: this.components.getPeerId(), - relayAddrs: this.components.getAddressManager().getListenAddrs(), - reservationStore: this.reservationStore, - request - }) + try { + const source = abortableDuplex(stream, controller.signal) + const streamHandler = new StreamHandlerV2({ stream: { ...stream, ...source } }) + const request = CircuitV2.HopMessage.decode(await streamHandler.read()) + + if (request?.type == null) { + log('request was invalid, could not read from stream') + streamHandler.write(CircuitV2.HopMessage.encode({ + type: CircuitV2.HopMessage.Type.STATUS, + status: CircuitV2.Status.MALFORMED_MESSAGE + })) + streamHandler.close() + return + } + + await CircuitV2Handler.handleHopProtocol({ + connection, + streamHandler, + circuit: this, + relayPeer: this.components.getPeerId(), + relayAddrs: this.components.getAddressManager().getListenAddrs(), + reservationStore: this.reservationStore, + request + }) + } finally { + controller.clear() + } } async _onV2ProtocolStop({ connection, stream }: IncomingStreamData) { @@ -224,11 +299,19 @@ export class Circuit implements Transport, Startable { } try { +<<<<<<< HEAD const stream = await relayConnection.newStream([protocolIDv2Hop, RELAY_V1_CODEC]) switch (stream.protocol) { case relayV1Codec: return await this.connectV1({ stream: stream.stream, +======= + const stream = await relayConnection.newStream([RELAY_V2_HOP_CODEC, RELAY_V1_CODEC]) + + switch (stream.stat.protocol) { + case RELAY_V1_CODEC: return await this.connectV1({ + stream, +>>>>>>> 5268cb3f (fix: bugs, circuitv1 code) connection: relayConnection, destinationPeer, destinationAddr, @@ -236,8 +319,13 @@ export class Circuit implements Transport, Startable { ma, disconnectOnFailure }) +<<<<<<< HEAD case relayV2HopCodec: return await this.connectV2({ stream: stream.stream, +======= + case RELAY_V2_HOP_CODEC: return await this.connectV2({ + stream, +>>>>>>> 5268cb3f (fix: bugs, circuitv1 code) connection: relayConnection, destinationPeer, destinationAddr, diff --git a/src/circuit/v2/hop.ts b/src/circuit/v2/hop.ts index 1fb7c86bac..cce61c7fa7 100644 --- a/src/circuit/v2/hop.ts +++ b/src/circuit/v2/hop.ts @@ -8,7 +8,7 @@ import { StreamHandlerV2 } from './stream-handler.js' import type { Circuit } from '../transport.js' import { Multiaddr } from '@multiformats/multiaddr' import type { Acl, ReservationStore } from './interfaces.js' -import { relayV2HopCodec } from '../multicodec.js' +import { RELAY_V2_HOP_CODEC } from '../multicodec.js' import { validateHopConnectRequest } from './validation.js' import { stop } from './stop.js' import { ReservationVoucherRecord } from './reservation-voucher.js' @@ -42,7 +42,7 @@ export async function handleHopProtocol (options: HopProtocolOptions) { export async function reserve (connection: Connection) { log('requesting reservation from %s', connection.remotePeer) - const { stream } = await connection.newStream([relayV2HopCodec]) + const stream = await connection.newStream([RELAY_V2_HOP_CODEC]) const streamHandler = new StreamHandlerV2({ stream }) streamHandler.write(HopMessage.encode({ type: HopMessage.Type.RESERVE diff --git a/src/circuit/v2/stop.ts b/src/circuit/v2/stop.ts index d68f21cb15..cf57d6133d 100644 --- a/src/circuit/v2/stop.ts +++ b/src/circuit/v2/stop.ts @@ -4,7 +4,7 @@ import type { Connection } from '@libp2p/interfaces/connection' import { logger } from '@libp2p/logger' import { StreamHandlerV2 } from './stream-handler.js' -import { relayV2StopCodec } from '../multicodec.js' +import { RELAY_V2_STOP_CODEC } from '../multicodec.js' import { validateStopConnectRequest } from './validation.js' const log = logger('libp2p:circuitv2:stop') @@ -54,7 +54,7 @@ export async function stop ({ connection, request }: StopOptions) { - const { stream } = await connection.newStream([relayV2StopCodec]) + const stream = await connection.newStream([RELAY_V2_STOP_CODEC]) log('starting circuit relay v2 stop request to %s', connection.remotePeer) const streamHandler = new StreamHandlerV2({ stream }) streamHandler.write(StopMessage.encode(request).finish()) diff --git a/test/circuit/v2/hop.spec.ts b/test/circuit/v2/hop.spec.ts index 2b4bc46644..c3e324955c 100644 --- a/test/circuit/v2/hop.spec.ts +++ b/test/circuit/v2/hop.spec.ts @@ -1,3 +1,4 @@ +<<<<<<< HEAD import { relayV2StopCodec } from './../../../src/circuit/multicodec.js' import { mockDuplex, mockConnection, mockMultiaddrConnection, mockStream } from '@libp2p/interface-compliance-tests/mocks' @@ -9,10 +10,21 @@ import type { Connection } from '@libp2p/interfaces/connection' import type { PeerId } from '@libp2p/interfaces/peer-id' import { Status, StopMessage, HopMessage } from '../../../src/circuit/v2/pb/index.js' import { ReservationStore } from '../../../src/circuit/v2/reservation-store.js' -import sinon from 'sinon' -import { Circuit } from '../../../src/circuit/transport.js' +======= +import type { Connection } from '@libp2p/interface-connection' +import { mockConnection, mockDuplex, mockMultiaddrConnection, mockStream } from '@libp2p/interface-mocks' +import type { PeerId } from '@libp2p/interface-peer-id' import { Multiaddr } from '@multiformats/multiaddr' +import { expect } from 'aegir/chai' import { pair } from 'it-pair' +>>>>>>> 5268cb3f (fix: bugs, circuitv1 code) +import sinon from 'sinon' +import { Circuit } from '../../../src/circuit/transport.js' +import { handleHopProtocol } from '../../../src/circuit/v2/hop.js' +import { HopMessage, Status, StopMessage } from '../../../src/circuit/v2/pb/index.js' +import { ReservationStore } from '../../../src/circuit/v2/reservation-store.js' +import { StreamHandlerV2 } from '../../../src/circuit/v2/stream-handler.js' +import * as peerUtils from '../../utils/creators/peer.js' /* eslint-env mocha */ @@ -147,7 +159,22 @@ describe('Circuit v2 - hop protocol', function () { conn = await mockConnection(mockMultiaddrConnection(mockDuplex(), relayPeer)) streamHandler = new StreamHandlerV2({ stream: mockStream(pair()) }) reservationStore = new ReservationStore() - circuit = new Circuit({}) + circuit = new Circuit({ + enabled: true, + limit: 15, + advertise: { + enabled: false + }, + hop: { + enabled: true, + active: false, + timeout: 30000 + }, + autoRelay: { + enabled: false, + maxListeners: 2 + } + }) }) this.afterEach(async function () { diff --git a/test/configuration/protocol-prefix.node.ts b/test/configuration/protocol-prefix.node.ts index c21cdd5658..a8e72df081 100644 --- a/test/configuration/protocol-prefix.node.ts +++ b/test/configuration/protocol-prefix.node.ts @@ -33,7 +33,7 @@ describe('Protocol prefix is configurable', () => { const protocols = await libp2p.peerStore.protoBook.get(libp2p.peerId) expect(protocols).to.include.members([ `/${testProtocol}/fetch/0.0.1`, - '/libp2p/circuit/relay/0.1.0', + '/libp2p/circuit/relay/0.2.0/hop', `/${testProtocol}/id/1.0.0`, `/${testProtocol}/id/push/1.0.0`, `/${testProtocol}/ping/1.0.0` @@ -46,7 +46,7 @@ describe('Protocol prefix is configurable', () => { const protocols = await libp2p.peerStore.protoBook.get(libp2p.peerId) expect(protocols).to.include.members([ - '/libp2p/circuit/relay/0.1.0', + '/libp2p/circuit/relay/0.2.0/hop', '/ipfs/id/1.0.0', '/ipfs/id/push/1.0.0', '/ipfs/ping/1.0.0', diff --git a/test/dialing/resolver.spec.ts b/test/dialing/resolver.spec.ts index c36b97b712..dfb80a63e3 100644 --- a/test/dialing/resolver.spec.ts +++ b/test/dialing/resolver.spec.ts @@ -16,7 +16,7 @@ import { mockConnection, mockDuplex, mockMultiaddrConnection } from '@libp2p/int import { peerIdFromString } from '@libp2p/peer-id' import { pEvent } from 'p-event' import { createFromJSON } from '@libp2p/peer-id-factory' -import { relayV2HopCodec } from '../../src/circuit/multicodec.js' +import { RELAY_V2_HOP_CODEC } from '../../src/circuit/multicodec.js' const relayAddr = MULTIADDRS_WEBSOCKETS[0] @@ -105,7 +105,7 @@ describe('Dialing (resolvable addresses)', () => { // Use the last peer const peerId = await createFromJSON(Peers[Peers.length - 1]) // ensure remote libp2p creates reservation on relay - await remoteLibp2p.components.getPeerStore().protoBook.add(peerId, [relayV2HopCodec]) + await remoteLibp2p.components.getPeerStore().protoBook.add(peerId, [RELAY_V2_HOP_CODEC]) const remoteId = remoteLibp2p.peerId const dialAddr = multiaddr(`/dnsaddr/remote.libp2p.io/p2p/${remoteId.toString()}`) const relayedAddrFetched = multiaddr(relayedAddr(remoteId)) @@ -140,7 +140,7 @@ describe('Dialing (resolvable addresses)', () => { // Use the last peer const relayId = await createFromJSON(Peers[Peers.length - 1]) // ensure remote libp2p creates reservation on relay - await remoteLibp2p.components.getPeerStore().protoBook.add(relayId, [relayV2HopCodec]) + await remoteLibp2p.components.getPeerStore().protoBook.add(relayId, [RELAY_V2_HOP_CODEC]) // create reservation on relay // eslint-disable-next-line @typescript-eslint/no-non-null-assertion @@ -210,7 +210,7 @@ describe('Dialing (resolvable addresses)', () => { // Use the last peer const relayId = await createFromJSON(Peers[Peers.length - 1]) // ensure remote libp2p creates reservation on relay - await remoteLibp2p.components.getPeerStore().protoBook.add(relayId, [relayV2HopCodec]) + await remoteLibp2p.components.getPeerStore().protoBook.add(relayId, [RELAY_V2_HOP_CODEC]) // create reservation on relay // eslint-disable-next-line @typescript-eslint/no-non-null-assertion diff --git a/test/relay/relay.node.ts b/test/relay/relay.node.ts index 9d3bf65b8e..8798b9faaa 100644 --- a/test/relay/relay.node.ts +++ b/test/relay/relay.node.ts @@ -1,18 +1,19 @@ -import { StreamHandlerV1 } from './../../src/circuit/v1/stream-handler.js' import { expect } from 'aegir/utils/chai.js' -import sinon from 'sinon' import { multiaddr } from '@multiformats/multiaddr' import { pipe } from 'it-pipe' +import { pEvent } from 'p-event' +import sinon from 'sinon' import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' -import { createNode } from '../utils/creators/peer.js' +import { RELAY_V2_HOP_CODEC } from '../../src/circuit/multicodec.js' +import { CircuitRelay } from '../../src/circuit/v1/pb/index.js' +import { HopMessage } from '../../src/circuit/v2/pb/index.js' +import { StreamHandlerV2 } from '../../src/circuit/v2/stream-handler.js' import { codes as Errors } from '../../src/errors.js' import type { Libp2pNode } from '../../src/libp2p.js' -import all from 'it-all' -import { relayV1Codec } from '../../src/circuit/multicodec.js' +import { createNode } from '../utils/creators/peer.js' import { createNodeOptions, createRelayOptions } from './utils.js' +import all from 'it-all' import delay from 'delay' -import { CircuitRelay } from '../../src/circuit/v1/pb/index.js' -import { pEvent } from 'p-event' /* eslint-env mocha */ @@ -161,13 +162,11 @@ describe('Dialing (via relay, TCP)', () => { // send an invalid relay message from the relay to the destination peer const connections = relayLibp2p.getConnections(dstLibp2p.peerId) - const stream = await connections[0].newStream(relayV1Codec) - const streamHandler = new StreamHandlerV1({ stream }) - streamHandler.write({ - type: CircuitRelay.Type.STATUS - }) - const res = await streamHandler.read() - expect(res?.code).to.equal(CircuitRelay.Status.MALFORMED_MESSAGE) + const stream = await connections[0].newStream(RELAY_V2_HOP_CODEC) + const streamHandler = new StreamHandlerV2({ stream }) + streamHandler.write(new Uint8Array()) + const res = HopMessage.decode(await streamHandler.read()) + expect(res?.status).to.equal(CircuitRelay.Status.MALFORMED_MESSAGE) streamHandler.close() // should still be connected @@ -186,7 +185,7 @@ describe('Dialing (via relay, TCP)', () => { }, hop: { // very short timeout - timeout: 10 + timeout: 5 } } }) @@ -196,7 +195,7 @@ describe('Dialing (via relay, TCP)', () => { const dialAddr = relayAddr.encapsulate(`/p2p/${relayLibp2p.peerId.toString()}`) const connection = await srcLibp2p.dial(dialAddr) - const stream = await connection.newStream('/libp2p/circuit/relay/0.1.0') + const stream = await connection.newStream(RELAY_V2_HOP_CODEC) await stream.sink(async function * () { // delay for longer than the timeout