Skip to content

Commit

Permalink
fix: bugs, circuitv1 code
Browse files Browse the repository at this point in the history
  • Loading branch information
mpetrunic authored and ckousik committed Dec 23, 2022
1 parent f4bbed0 commit a03a6b1
Show file tree
Hide file tree
Showing 9 changed files with 167 additions and 53 deletions.
6 changes: 3 additions & 3 deletions src/circuit/client.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { logger } from '@libp2p/logger'
import { relayV2HopCodec } from './multicodec.js'
import { RELAY_V2_HOP_CODEC } from './multicodec.js'
import { getExpiration, namespaceToCid } from './utils.js'
import {
CIRCUIT_PROTO_CODE,
Expand Down Expand Up @@ -99,7 +99,7 @@ export class CircuitService extends EventEmitter<CircuitServiceEvents> implement
}

// Check if it has the protocol
const hasProtocol = protocols.find(protocol => protocol === relayV2HopCodec)
const hasProtocol = protocols.find(protocol => protocol === RELAY_V2_HOP_CODEC)
log.trace(`Peer ${peerId.toString()} protocol change`, this.components.getPeerId().toString())

// If no protocol, check if we were keeping the peer before as a listenRelay
Expand Down Expand Up @@ -252,7 +252,7 @@ export class CircuitService extends EventEmitter<CircuitServiceEvents> implement
continue
}

const hasProtocol = protocols.find(protocol => protocol === relayV2HopCodec)
const hasProtocol = protocols.find(protocol => protocol === RELAY_V2_HOP_CODEC)

// Continue to next if it does not support Hop
if (hasProtocol == null) {
Expand Down
6 changes: 3 additions & 3 deletions src/circuit/multicodec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@

export const relayV1Codec = '/libp2p/circuit/relay/0.1.0'
export const relayV2HopCodec = '/libp2p/circuit/relay/0.2.0/hop'
export const relayV2StopCodec = '/libp2p/circuit/relay/0.2.0/stop'
export const RELAY_V1_CODEC = '/libp2p/circuit/relay/0.1.0'
export const RELAY_V2_HOP_CODEC = '/libp2p/circuit/relay/0.2.0/hop'
export const RELAY_V2_STOP_CODEC = '/libp2p/circuit/relay/0.2.0/stop'
124 changes: 106 additions & 18 deletions src/circuit/transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import type { Multiaddr } from '@multiformats/multiaddr'
import { multiaddr } from '@multiformats/multiaddr'
import { codes } from '../errors.js'
import { streamToMaConnection } from '@libp2p/utils/stream-to-ma-conn'
import { relayV2HopCodec, relayV1Codec, relayV2StopCodec } from './multicodec.js'
import { RELAY_V2_HOP_CODEC, RELAY_V1_CODEC, RELAY_V2_STOP_CODEC } from './multicodec.js'
import { createListener } from './listener.js'
import { peerIdFromString } from '@libp2p/peer-id'
import type { IncomingStreamData } from '@libp2p/interface-registrar'
Expand All @@ -17,7 +17,14 @@ import { StreamHandlerV2 } from './v2/stream-handler.js'
import { StreamHandlerV1 } from './v1/stream-handler.js'
import * as CircuitV1Handler from './v1/index.js'
import * as CircuitV2Handler from './v2/index.js'
<<<<<<< HEAD
import type { ReservationStore } from './v2/reservation-store.js'
=======
import { TimeoutController } from 'timeout-abort-controller'
import type { RelayConfig } from '../index.js'
import { setMaxListeners } from 'events'
import { abortableDuplex } from 'abortable-iterator'
>>>>>>> 5268cb3f (fix: bugs, circuitv1 code)

const log = logger('libp2p:circuit')

Expand All @@ -34,6 +41,7 @@ interface ConnectOptions {
ma: Multiaddr
disconnectOnFailure: boolean
}
<<<<<<< HEAD

export class Circuit implements Transport, Startable {
private handler?: ConnectionHandler
Expand All @@ -47,6 +55,17 @@ export class Circuit implements Transport, Startable {
this.components = components
this._started = false
this.reservationStore = new ReservationStore()
=======
export class Circuit implements Transport, Initializable {
private handler?: ConnectionHandler
private components: Components = new Components()
private readonly reservationStore: ReservationStore
private readonly _init: RelayConfig

constructor (options: RelayConfig) {
this._init = options
this.reservationStore = new ReservationStore(options.limit)
>>>>>>> 5268cb3f (fix: bugs, circuitv1 code)
}

isStarted() {
Expand All @@ -60,23 +79,23 @@ export class Circuit implements Transport, Startable {

this._started = true

void this.components.getRegistrar().handle(relayV1Codec, (data) => {
void this.components.getRegistrar().handle(RELAY_V1_CODEC, (data) => {
void this._onProtocolV1(data).catch(err => {
log.error(err)
})
})
.catch(err => {
log.error(err)
})
void this.components.getRegistrar().handle(relayV2HopCodec, (data) => {
void this.components.getRegistrar().handle(RELAY_V2_HOP_CODEC, (data) => {
void this._onV2ProtocolHop(data).catch(err => {
log.error(err)
})
})
.catch(err => {
log.error(err)
})
void this.components.getRegistrar().handle(relayV2StopCodec, (data) => {
void this.components.getRegistrar().handle(RELAY_V2_STOP_CODEC, (data) => {
void this._onV2ProtocolStop(data).catch(err => {
log.error(err)
})
Expand Down Expand Up @@ -112,6 +131,7 @@ export class Circuit implements Transport, Startable {

async _onProtocolV1(data: IncomingStreamData) {
const { connection, stream } = data
<<<<<<< HEAD
const streamHandler = new StreamHandlerV1({ stream })
try {
const request = await streamHandler.read()
Expand Down Expand Up @@ -139,29 +159,84 @@ export class Circuit implements Transport, Startable {
CircuitV1Handler.handleCircuitV1Error(streamHandler, CircuitV1.CircuitRelay.Status.MALFORMED_MESSAGE)
}
}
=======
const controller = new TimeoutController(this._init.hop.timeout)

try {
// fails on node < 15.4
setMaxListeners?.(Infinity, controller.signal)
} catch {}

try {
const source = abortableDuplex(stream, controller.signal)
const streamHandler = new StreamHandlerV1({ stream: { ...stream, ...source } })
const request = await streamHandler.read()

if (request == null) {
log('request was invalid, could not read from stream')
CircuitV1Handler.handleCircuitV1Error(streamHandler, CircuitV1.CircuitRelay.Status.MALFORMED_MESSAGE)
return
}

switch (request.type) {
case CircuitV1.CircuitRelay.Type.CAN_HOP:
case CircuitV1.CircuitRelay.Type.HOP: {
log('received circuit v1 hop request from %p', connection.remotePeer)
CircuitV1Handler.handleCircuitV1Error(streamHandler, CircuitV1.CircuitRelay.Status.HOP_CANT_SPEAK_RELAY)
break
}
case CircuitV1.CircuitRelay.Type.STOP: {
log('received circuit v1 stop request from %p', connection.remotePeer)
CircuitV1Handler.handleCircuitV1Error(streamHandler, CircuitV1.CircuitRelay.Status.STOP_RELAY_REFUSED)
break
}
default: {
log('Request of type %s not supported', request.type)
CircuitV1Handler.handleCircuitV1Error(streamHandler, CircuitV1.CircuitRelay.Status.MALFORMED_MESSAGE)
}
}
>>>>>>> 5268cb3f (fix: bugs, circuitv1 code)
} finally {
controller.clear()
}
}

async _onV2ProtocolHop({ connection, stream }: IncomingStreamData) {
log('received circuit v2 hop protocol stream from %s', connection.remotePeer)
const streamHandler = new StreamHandlerV2({ stream })
const request = CircuitV2.HopMessage.decode(await streamHandler.read())
const controller = new TimeoutController(this._init.hop.timeout)

if (request?.type === undefined) {
return
}
try {
// fails on node < 15.4
setMaxListeners?.(Infinity, controller.signal)
} catch {}

await CircuitV2Handler.handleHopProtocol({
connection,
streamHandler,
circuit: this,
relayPeer: this.components.getPeerId(),
relayAddrs: this.components.getAddressManager().getListenAddrs(),
reservationStore: this.reservationStore,
request
})
try {
const source = abortableDuplex(stream, controller.signal)
const streamHandler = new StreamHandlerV2({ stream: { ...stream, ...source } })
const request = CircuitV2.HopMessage.decode(await streamHandler.read())

if (request?.type == null) {
log('request was invalid, could not read from stream')
streamHandler.write(CircuitV2.HopMessage.encode({
type: CircuitV2.HopMessage.Type.STATUS,
status: CircuitV2.Status.MALFORMED_MESSAGE
}))
streamHandler.close()
return
}

await CircuitV2Handler.handleHopProtocol({
connection,
streamHandler,
circuit: this,
relayPeer: this.components.getPeerId(),
relayAddrs: this.components.getAddressManager().getListenAddrs(),
reservationStore: this.reservationStore,
request
})
} finally {
controller.clear()
}
}

async _onV2ProtocolStop({ connection, stream }: IncomingStreamData) {
Expand Down Expand Up @@ -224,20 +299,33 @@ export class Circuit implements Transport, Startable {
}

try {
<<<<<<< HEAD
const stream = await relayConnection.newStream([protocolIDv2Hop, RELAY_V1_CODEC])

switch (stream.protocol) {
case relayV1Codec: return await this.connectV1({
stream: stream.stream,
=======
const stream = await relayConnection.newStream([RELAY_V2_HOP_CODEC, RELAY_V1_CODEC])

switch (stream.stat.protocol) {
case RELAY_V1_CODEC: return await this.connectV1({
stream,
>>>>>>> 5268cb3f (fix: bugs, circuitv1 code)
connection: relayConnection,
destinationPeer,
destinationAddr,
relayAddr,
ma,
disconnectOnFailure
})
<<<<<<< HEAD
case relayV2HopCodec: return await this.connectV2({
stream: stream.stream,
=======
case RELAY_V2_HOP_CODEC: return await this.connectV2({
stream,
>>>>>>> 5268cb3f (fix: bugs, circuitv1 code)
connection: relayConnection,
destinationPeer,
destinationAddr,
Expand Down
4 changes: 2 additions & 2 deletions src/circuit/v2/hop.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { StreamHandlerV2 } from './stream-handler.js'
import type { Circuit } from '../transport.js'
import { Multiaddr } from '@multiformats/multiaddr'
import type { Acl, ReservationStore } from './interfaces.js'
import { relayV2HopCodec } from '../multicodec.js'
import { RELAY_V2_HOP_CODEC } from '../multicodec.js'
import { validateHopConnectRequest } from './validation.js'
import { stop } from './stop.js'
import { ReservationVoucherRecord } from './reservation-voucher.js'
Expand Down Expand Up @@ -42,7 +42,7 @@ export async function handleHopProtocol (options: HopProtocolOptions) {

export async function reserve (connection: Connection) {
log('requesting reservation from %s', connection.remotePeer)
const { stream } = await connection.newStream([relayV2HopCodec])
const stream = await connection.newStream([RELAY_V2_HOP_CODEC])
const streamHandler = new StreamHandlerV2({ stream })
streamHandler.write(HopMessage.encode({
type: HopMessage.Type.RESERVE
Expand Down
4 changes: 2 additions & 2 deletions src/circuit/v2/stop.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import type { Connection } from '@libp2p/interfaces/connection'

import { logger } from '@libp2p/logger'
import { StreamHandlerV2 } from './stream-handler.js'
import { relayV2StopCodec } from '../multicodec.js'
import { RELAY_V2_STOP_CODEC } from '../multicodec.js'
import { validateStopConnectRequest } from './validation.js'

const log = logger('libp2p:circuitv2:stop')
Expand Down Expand Up @@ -54,7 +54,7 @@ export async function stop ({
connection,
request
}: StopOptions) {
const { stream } = await connection.newStream([relayV2StopCodec])
const stream = await connection.newStream([RELAY_V2_STOP_CODEC])
log('starting circuit relay v2 stop request to %s', connection.remotePeer)
const streamHandler = new StreamHandlerV2({ stream })
streamHandler.write(StopMessage.encode(request).finish())
Expand Down
33 changes: 30 additions & 3 deletions test/circuit/v2/hop.spec.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
<<<<<<< HEAD

import { relayV2StopCodec } from './../../../src/circuit/multicodec.js'
import { mockDuplex, mockConnection, mockMultiaddrConnection, mockStream } from '@libp2p/interface-compliance-tests/mocks'
Expand All @@ -9,10 +10,21 @@ import type { Connection } from '@libp2p/interfaces/connection'
import type { PeerId } from '@libp2p/interfaces/peer-id'
import { Status, StopMessage, HopMessage } from '../../../src/circuit/v2/pb/index.js'
import { ReservationStore } from '../../../src/circuit/v2/reservation-store.js'
import sinon from 'sinon'
import { Circuit } from '../../../src/circuit/transport.js'
=======
import type { Connection } from '@libp2p/interface-connection'
import { mockConnection, mockDuplex, mockMultiaddrConnection, mockStream } from '@libp2p/interface-mocks'
import type { PeerId } from '@libp2p/interface-peer-id'
import { Multiaddr } from '@multiformats/multiaddr'
import { expect } from 'aegir/chai'
import { pair } from 'it-pair'
>>>>>>> 5268cb3f (fix: bugs, circuitv1 code)
import sinon from 'sinon'
import { Circuit } from '../../../src/circuit/transport.js'
import { handleHopProtocol } from '../../../src/circuit/v2/hop.js'
import { HopMessage, Status, StopMessage } from '../../../src/circuit/v2/pb/index.js'
import { ReservationStore } from '../../../src/circuit/v2/reservation-store.js'
import { StreamHandlerV2 } from '../../../src/circuit/v2/stream-handler.js'
import * as peerUtils from '../../utils/creators/peer.js'

/* eslint-env mocha */

Expand Down Expand Up @@ -147,7 +159,22 @@ describe('Circuit v2 - hop protocol', function () {
conn = await mockConnection(mockMultiaddrConnection(mockDuplex(), relayPeer))
streamHandler = new StreamHandlerV2({ stream: mockStream(pair<Uint8Array>()) })
reservationStore = new ReservationStore()
circuit = new Circuit({})
circuit = new Circuit({
enabled: true,
limit: 15,
advertise: {
enabled: false
},
hop: {
enabled: true,
active: false,
timeout: 30000
},
autoRelay: {
enabled: false,
maxListeners: 2
}
})
})

this.afterEach(async function () {
Expand Down
4 changes: 2 additions & 2 deletions test/configuration/protocol-prefix.node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ describe('Protocol prefix is configurable', () => {
const protocols = await libp2p.peerStore.protoBook.get(libp2p.peerId)
expect(protocols).to.include.members([
`/${testProtocol}/fetch/0.0.1`,
'/libp2p/circuit/relay/0.1.0',
'/libp2p/circuit/relay/0.2.0/hop',
`/${testProtocol}/id/1.0.0`,
`/${testProtocol}/id/push/1.0.0`,
`/${testProtocol}/ping/1.0.0`
Expand All @@ -46,7 +46,7 @@ describe('Protocol prefix is configurable', () => {

const protocols = await libp2p.peerStore.protoBook.get(libp2p.peerId)
expect(protocols).to.include.members([
'/libp2p/circuit/relay/0.1.0',
'/libp2p/circuit/relay/0.2.0/hop',
'/ipfs/id/1.0.0',
'/ipfs/id/push/1.0.0',
'/ipfs/ping/1.0.0',
Expand Down
8 changes: 4 additions & 4 deletions test/dialing/resolver.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import { mockConnection, mockDuplex, mockMultiaddrConnection } from '@libp2p/int
import { peerIdFromString } from '@libp2p/peer-id'
import { pEvent } from 'p-event'
import { createFromJSON } from '@libp2p/peer-id-factory'
import { relayV2HopCodec } from '../../src/circuit/multicodec.js'
import { RELAY_V2_HOP_CODEC } from '../../src/circuit/multicodec.js'

const relayAddr = MULTIADDRS_WEBSOCKETS[0]

Expand Down Expand Up @@ -105,7 +105,7 @@ describe('Dialing (resolvable addresses)', () => {
// Use the last peer
const peerId = await createFromJSON(Peers[Peers.length - 1])
// ensure remote libp2p creates reservation on relay
await remoteLibp2p.components.getPeerStore().protoBook.add(peerId, [relayV2HopCodec])
await remoteLibp2p.components.getPeerStore().protoBook.add(peerId, [RELAY_V2_HOP_CODEC])
const remoteId = remoteLibp2p.peerId
const dialAddr = multiaddr(`/dnsaddr/remote.libp2p.io/p2p/${remoteId.toString()}`)
const relayedAddrFetched = multiaddr(relayedAddr(remoteId))
Expand Down Expand Up @@ -140,7 +140,7 @@ describe('Dialing (resolvable addresses)', () => {
// Use the last peer
const relayId = await createFromJSON(Peers[Peers.length - 1])
// ensure remote libp2p creates reservation on relay
await remoteLibp2p.components.getPeerStore().protoBook.add(relayId, [relayV2HopCodec])
await remoteLibp2p.components.getPeerStore().protoBook.add(relayId, [RELAY_V2_HOP_CODEC])

// create reservation on relay
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
Expand Down Expand Up @@ -210,7 +210,7 @@ describe('Dialing (resolvable addresses)', () => {
// Use the last peer
const relayId = await createFromJSON(Peers[Peers.length - 1])
// ensure remote libp2p creates reservation on relay
await remoteLibp2p.components.getPeerStore().protoBook.add(relayId, [relayV2HopCodec])
await remoteLibp2p.components.getPeerStore().protoBook.add(relayId, [RELAY_V2_HOP_CODEC])

// create reservation on relay
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
Expand Down
Loading

0 comments on commit a03a6b1

Please sign in to comment.