Skip to content

Commit

Permalink
fix: update code from master
Browse files Browse the repository at this point in the history
  • Loading branch information
achingbrain authored and ckousik committed Dec 23, 2022
1 parent 574f30e commit cd6ac9c
Show file tree
Hide file tree
Showing 9 changed files with 572 additions and 100 deletions.
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@
"build": "aegir build",
"docs": "aegir docs",
"generate": "run-s generate:proto:*",
"generate:proto:circuit": "protons ./src/circuit/pb/index.proto",
"generate:proto:circuitv1": "protons ./src/circuit/v1/pb/index.proto",
"generate:proto:circuitv2": "protons ./src/circuit/v2/pb/index.proto",
"generate:proto:fetch": "protons ./src/fetch/pb/proto.proto",
"generate:proto:identify": "protons ./src/identify/pb/message.proto",
"generate:proto:plaintext": "protons ./src/insecure/pb/proto.proto",
Expand Down
70 changes: 0 additions & 70 deletions src/circuit/transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,7 @@ import { StreamHandlerV2 } from './v2/stream-handler.js'
import { StreamHandlerV1 } from './v1/stream-handler.js'
import * as CircuitV1Handler from './v1/index.js'
import * as CircuitV2Handler from './v2/index.js'
<<<<<<< HEAD
import type { ReservationStore } from './v2/reservation-store.js'
=======
import { TimeoutController } from 'timeout-abort-controller'
import type { RelayConfig } from '../index.js'
import { setMaxListeners } from 'events'
import { abortableDuplex } from 'abortable-iterator'
>>>>>>> 5268cb3f (fix: bugs, circuitv1 code)

const log = logger('libp2p:circuit')

Expand All @@ -41,7 +34,6 @@ interface ConnectOptions {
ma: Multiaddr
disconnectOnFailure: boolean
}
<<<<<<< HEAD

export class Circuit implements Transport, Startable {
private handler?: ConnectionHandler
Expand All @@ -55,17 +47,6 @@ export class Circuit implements Transport, Startable {
this.components = components
this._started = false
this.reservationStore = new ReservationStore()
=======
export class Circuit implements Transport, Initializable {
private handler?: ConnectionHandler
private components: Components = new Components()
private readonly reservationStore: ReservationStore
private readonly _init: RelayConfig

constructor (options: RelayConfig) {
this._init = options
this.reservationStore = new ReservationStore(options.limit)
>>>>>>> 5268cb3f (fix: bugs, circuitv1 code)
}

isStarted() {
Expand Down Expand Up @@ -131,7 +112,6 @@ export class Circuit implements Transport, Initializable {

async _onProtocolV1(data: IncomingStreamData) {
const { connection, stream } = data
<<<<<<< HEAD
const streamHandler = new StreamHandlerV1({ stream })
try {
const request = await streamHandler.read()
Expand Down Expand Up @@ -159,43 +139,6 @@ export class Circuit implements Transport, Initializable {
CircuitV1Handler.handleCircuitV1Error(streamHandler, CircuitV1.CircuitRelay.Status.MALFORMED_MESSAGE)
}
}
=======
const controller = new TimeoutController(this._init.hop.timeout)

try {
// fails on node < 15.4
setMaxListeners?.(Infinity, controller.signal)
} catch {}

try {
const source = abortableDuplex(stream, controller.signal)
const streamHandler = new StreamHandlerV1({ stream: { ...stream, ...source } })
const request = await streamHandler.read()

if (request == null) {
log('request was invalid, could not read from stream')
CircuitV1Handler.handleCircuitV1Error(streamHandler, CircuitV1.CircuitRelay.Status.MALFORMED_MESSAGE)
return
}

switch (request.type) {
case CircuitV1.CircuitRelay.Type.CAN_HOP:
case CircuitV1.CircuitRelay.Type.HOP: {
log('received circuit v1 hop request from %p', connection.remotePeer)
CircuitV1Handler.handleCircuitV1Error(streamHandler, CircuitV1.CircuitRelay.Status.HOP_CANT_SPEAK_RELAY)
break
}
case CircuitV1.CircuitRelay.Type.STOP: {
log('received circuit v1 stop request from %p', connection.remotePeer)
CircuitV1Handler.handleCircuitV1Error(streamHandler, CircuitV1.CircuitRelay.Status.STOP_RELAY_REFUSED)
break
}
default: {
log('Request of type %s not supported', request.type)
CircuitV1Handler.handleCircuitV1Error(streamHandler, CircuitV1.CircuitRelay.Status.MALFORMED_MESSAGE)
}
}
>>>>>>> 5268cb3f (fix: bugs, circuitv1 code)
} finally {
controller.clear()
}
Expand Down Expand Up @@ -299,33 +242,20 @@ export class Circuit implements Transport, Initializable {
}

try {
<<<<<<< HEAD
const stream = await relayConnection.newStream([protocolIDv2Hop, RELAY_V1_CODEC])

switch (stream.protocol) {
case relayV1Codec: return await this.connectV1({
stream: stream.stream,
=======
const stream = await relayConnection.newStream([RELAY_V2_HOP_CODEC, RELAY_V1_CODEC])

switch (stream.stat.protocol) {
case RELAY_V1_CODEC: return await this.connectV1({
stream,
>>>>>>> 5268cb3f (fix: bugs, circuitv1 code)
connection: relayConnection,
destinationPeer,
destinationAddr,
relayAddr,
ma,
disconnectOnFailure
})
<<<<<<< HEAD
case relayV2HopCodec: return await this.connectV2({
stream: stream.stream,
=======
case RELAY_V2_HOP_CODEC: return await this.connectV2({
stream,
>>>>>>> 5268cb3f (fix: bugs, circuitv1 code)
connection: relayConnection,
destinationPeer,
destinationAddr,
Expand Down
1 change: 0 additions & 1 deletion src/circuit/v1/stream-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ export class StreamHandlerV1 {

this.stream = stream
this.shake = handshake(this.stream)
// @ts-expect-error some type incompatibilities
this.decoder = lp.decode.fromReader(this.shake.reader, { maxDataLength: maxLength })
}

Expand Down
5 changes: 3 additions & 2 deletions src/circuit/v2/hop.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ import type { Connection } from '@libp2p/interfaces/connection'
import { HopMessage, IHopMessage, IReservation, ILimit, Status, StopMessage } from './pb/index.js'
import { StreamHandlerV2 } from './stream-handler.js'
import type { Circuit } from '../transport.js'
import { Multiaddr } from '@multiformats/multiaddr'
import type { Multiaddr } from '@multiformats/multiaddr'
import { multiaddr } from '@multiformats/multiaddr'
import type { Acl, ReservationStore } from './interfaces.js'
import { RELAY_V2_HOP_CODEC } from '../multicodec.js'
import { validateHopConnectRequest } from './validation.js'
Expand Down Expand Up @@ -151,7 +152,7 @@ async function handleConnect (options: HopConnectOptions) {
type: StopMessage.Type.CONNECT,
peer: {
id: connection.remotePeer.toBytes(),
addrs: [new Multiaddr('/p2p/' + connection.remotePeer.toString()).bytes]
addrs: [multiaddr('/p2p/' + connection.remotePeer.toString()).bytes]
}
}
})
Expand Down
Loading

0 comments on commit cd6ac9c

Please sign in to comment.