Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: update to libp2p@2.x.x #504

Merged
merged 4 commits into from
Sep 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12,257 changes: 5,121 additions & 7,136 deletions package-lock.json

Large diffs are not rendered by default.

34 changes: 17 additions & 17 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -73,44 +73,44 @@
},
"homepage": "https://github.com/ChainSafe/js-libp2p-gossipsub#readme",
"dependencies": {
"@libp2p/crypto": "^4.0.1",
"@libp2p/interface": "^1.5.0",
"@libp2p/interface-internal": "^1.0.7",
"@libp2p/peer-id": "^4.0.5",
"@libp2p/pubsub": "^9.0.8",
"@libp2p/crypto": "^5.0.0",
"@libp2p/interface": "^2.0.0",
"@libp2p/interface-internal": "^2.0.0",
"@libp2p/peer-id": "^5.0.0",
"@libp2p/pubsub": "^10.0.0",
"@multiformats/multiaddr": "^12.1.14",
"denque": "^2.1.0",
"it-length-prefixed": "^9.0.4",
"it-pipe": "^3.0.1",
"it-pushable": "^3.2.3",
"multiformats": "^13.0.1",
"protons-runtime": "5.4.0",
"protons-runtime": "^5.5.0",
"uint8arraylist": "^2.4.8",
"uint8arrays": "^5.0.1"
},
"devDependencies": {
"@chainsafe/as-sha256": "^0.4.1",
"@dapplion/benchmark": "^0.2.4",
"@libp2p/floodsub": "^9.0.9",
"@libp2p/interface-compliance-tests": "^5.2.0",
"@libp2p/logger": "^4.0.5",
"@libp2p/peer-id-factory": "^4.0.5",
"@libp2p/peer-store": "^10.0.8",
"@libp2p/floodsub": "^10.0.0",
"@libp2p/interface-compliance-tests": "^6.0.0",
"@libp2p/logger": "^5.0.0",
"@libp2p/peer-store": "^11.0.0",
"@types/node": "^20.11.6",
"@types/sinon": "^17.0.3",
"abortable-iterator": "^5.1.0",
"aegir": "^42.2.2",
"datastore-core": "^9.2.7",
"aegir": "^44.1.1",
"datastore-core": "^10.0.0",
"delay": "^6.0.0",
"mkdirp": "^3.0.1",
"it-all": "^3.0.6",
"mkdirp": "^3.0.1",
"p-defer": "^4.0.0",
"p-event": "^6.0.0",
"p-retry": "^6.2.0",
"p-wait-for": "^5.0.2",
"protons": "^7.5.0",
"sinon": "^17.0.1",
"time-cache": "^0.3.0",
"ts-sinon": "^2.0.2"
"sinon": "^18.0.1",
"sinon-ts": "^2.0.0",
"time-cache": "^0.3.0"
},
"engines": {
"npm": ">=8.7.0"
Expand Down
17 changes: 17 additions & 0 deletions src/errors.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
export class InvalidPeerScoreParamsError extends Error {
static name = 'InvalidPeerScoreParamsError'

constructor (message = 'Invalid peer score params') {
super(message)
this.name = 'InvalidPeerScoreParamsError'
}
}

export class InvalidPeerScoreThresholdsError extends Error {
static name = 'InvalidPeerScoreThresholdsError'

constructor (message = 'Invalid peer score thresholds') {
super(message)
this.name = 'InvalidPeerScoreThresholdsError'
}
}
35 changes: 19 additions & 16 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { TypedEventEmitter, StrictSign, StrictNoSign, TopicValidatorResult, serviceCapabilities, serviceDependencies } from '@libp2p/interface'
import { peerIdFromBytes, peerIdFromString } from '@libp2p/peer-id'
import { peerIdFromMultihash, peerIdFromString } from '@libp2p/peer-id'
import { encode } from 'it-length-prefixed'
import { pipe } from 'it-pipe'
import { pushable } from 'it-pushable'
import * as Digest from 'multiformats/hashes/digest'
import * as constants from './constants.js'
import {
ACCEPT_FROM_WHITELIST_DURATION_MS,
Expand Down Expand Up @@ -73,7 +74,8 @@ import type {
TopicValidatorFn,
Logger,
ComponentLogger,
Topology
Topology,
PrivateKey
} from '@libp2p/interface'
import type { ConnectionManager, IncomingStreamData, Registrar } from '@libp2p/interface-internal'
import type { Multiaddr } from '@multiformats/multiaddr'
Expand Down Expand Up @@ -166,13 +168,13 @@ export interface GossipsubOpts extends GossipsubOptsSpec, PubSubInit {
maxOutboundStreams?: number

/**
* Pass true to run on transient connections - data or time-limited
* Pass true to run on limited connections - data or time-limited
* connections that may be closed at any time such as circuit relay
* connections.
*
* @default false
*/
runOnTransientConnection?: boolean
runOnLimitedConnection?: boolean

/**
* Specify max buffer size in bytes for OutboundStream.
Expand Down Expand Up @@ -259,6 +261,7 @@ interface AcceptFromWhitelistEntry {
}

export interface GossipSubComponents {
privateKey: PrivateKey
peerId: PeerId
peerStore: PeerStore
registrar: Registrar
Expand Down Expand Up @@ -420,7 +423,7 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub
private status: GossipStatus = { code: GossipStatusCode.stopped }
private readonly maxInboundStreams?: number
private readonly maxOutboundStreams?: number
private readonly runOnTransientConnection?: boolean
private readonly runOnLimitedConnection?: boolean
private readonly allowedTopics: Set<TopicStr> | null

private heartbeatTimer: {
Expand Down Expand Up @@ -554,7 +557,7 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub

this.maxInboundStreams = options.maxInboundStreams
this.maxOutboundStreams = options.maxOutboundStreams
this.runOnTransientConnection = options.runOnTransientConnection
this.runOnLimitedConnection = options.runOnLimitedConnection

this.allowedTopics = (opts.allowedTopics != null) ? new Set(opts.allowedTopics) : null
}
Expand Down Expand Up @@ -591,7 +594,7 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub

this.log('starting')

this.publishConfig = await getPublishConfigFromPeerId(this.globalSignaturePolicy, this.components.peerId)
this.publishConfig = getPublishConfigFromPeerId(this.globalSignaturePolicy, this.components.peerId, this.components.privateKey)

// Create the outbound inflight queue
// This ensures that outbound stream creation happens sequentially
Expand Down Expand Up @@ -619,7 +622,7 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub
registrar.handle(multicodec, this.onIncomingStream.bind(this), {
maxInboundStreams: this.maxInboundStreams,
maxOutboundStreams: this.maxOutboundStreams,
runOnTransientConnection: this.runOnTransientConnection
runOnLimitedConnection: this.runOnLimitedConnection
})
)
)
Expand All @@ -646,7 +649,7 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub
const topology: Topology = {
onConnect: this.onPeerConnected.bind(this),
onDisconnect: this.onPeerDisconnected.bind(this),
notifyOnTransient: this.runOnTransientConnection
notifyOnLimitedConnection: this.runOnLimitedConnection
}
const registrarTopologyIds = await Promise.all(
this.multicodecs.map(async (multicodec) => registrar.register(multicodec, topology))
Expand Down Expand Up @@ -817,7 +820,7 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub
try {
const stream = new OutboundStream(
await connection.newStream(this.multicodecs, {
runOnTransientConnection: this.runOnTransientConnection
runOnLimitedConnection: this.runOnLimitedConnection
}),
(e) => { this.log.error('outbound pipe error', e) },
{ maxBufferSize: this.opts.maxOutboundBufferSize }
Expand Down Expand Up @@ -1778,7 +1781,7 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub
return
}

const peer = peerIdFromBytes(pi.peerID)
const peer = peerIdFromMultihash(Digest.decode(pi.peerID))
const p = peer.toString()

if (this.peers.has(p)) {
Expand Down Expand Up @@ -1895,7 +1898,7 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub

// remove explicit peers, peers with negative scores, and backoffed peers
fanoutPeers.forEach((id) => {
if (!this.direct.has(id) && this.score.score(id) >= 0 && ((backoff == null) || !backoff.has(id))) {
if (!this.direct.has(id) && this.score.score(id) >= 0 && backoff?.has(id) !== true) {
toAdd.add(id)
}
})
Expand All @@ -1911,7 +1914,7 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub
this.opts.D,
(id: PeerIdStr): boolean =>
// filter direct peers and peers with negative score
!toAdd.has(id) && !this.direct.has(id) && this.score.score(id) >= 0 && ((backoff == null) || !backoff.has(id))
!toAdd.has(id) && !this.direct.has(id) && this.score.score(id) >= 0 && backoff?.has(id) !== true
)

newPeers.forEach((peer) => {
Expand Down Expand Up @@ -2610,13 +2613,13 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub
try {
peerInfo = await this.components.peerStore.get(id)
} catch (err: any) {
if (err.code !== 'ERR_NOT_FOUND') {
if (err.name !== 'NotFoundError') {
throw err
}
}

return {
peerID: id.toBytes(),
peerID: id.toMultihash().bytes,
signedPeerRecord: peerInfo?.peerRecordEnvelope
}
})
Expand Down Expand Up @@ -2741,7 +2744,7 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub
!this.direct.has(id)
) {
const score = getScore(id)
if (((backoff == null) || !backoff.has(id)) && score >= 0) candidateMeshPeers.add(id)
if (backoff?.has(id) !== true && score >= 0) candidateMeshPeers.add(id)
// instead of having to find gossip peers after heartbeat which require another loop
// we prepare peers to gossip in a topic within heartbeat to improve performance
if (score >= this.opts.scoreThresholds.gossipThreshold) peersToGossip.add(id)
Expand Down
52 changes: 34 additions & 18 deletions src/message/rpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
/* eslint-disable @typescript-eslint/no-unnecessary-boolean-literal-compare */
/* eslint-disable @typescript-eslint/no-empty-interface */

import { type Codec, CodeError, decodeMessage, type DecodeOptions, encodeMessage, message } from 'protons-runtime'
import { type Codec, decodeMessage, type DecodeOptions, encodeMessage, MaxLengthError, message } from 'protons-runtime'
import type { Uint8ArrayList } from 'uint8arraylist'

export interface RPC {
Expand Down Expand Up @@ -256,34 +256,42 @@ export namespace RPC {
switch (tag >>> 3) {
case 1: {
if (opts.limits?.ihave != null && obj.ihave.length === opts.limits.ihave) {
throw new CodeError('decode error - map field "ihave" had too many elements', 'ERR_MAX_LENGTH')
throw new MaxLengthError('Decode error - map field "ihave" had too many elements')
}

obj.ihave.push(RPC.ControlIHave.codec().decode(reader, reader.uint32()))
obj.ihave.push(RPC.ControlIHave.codec().decode(reader, reader.uint32(), {
limits: opts.limits?.ihave$
}))
break
}
case 2: {
if (opts.limits?.iwant != null && obj.iwant.length === opts.limits.iwant) {
throw new CodeError('decode error - map field "iwant" had too many elements', 'ERR_MAX_LENGTH')
throw new MaxLengthError('Decode error - map field "iwant" had too many elements')
}

obj.iwant.push(RPC.ControlIWant.codec().decode(reader, reader.uint32()))
obj.iwant.push(RPC.ControlIWant.codec().decode(reader, reader.uint32(), {
limits: opts.limits?.iwant$
}))
break
}
case 3: {
if (opts.limits?.graft != null && obj.graft.length === opts.limits.graft) {
throw new CodeError('decode error - map field "graft" had too many elements', 'ERR_MAX_LENGTH')
throw new MaxLengthError('Decode error - map field "graft" had too many elements')
}

obj.graft.push(RPC.ControlGraft.codec().decode(reader, reader.uint32()))
obj.graft.push(RPC.ControlGraft.codec().decode(reader, reader.uint32(), {
limits: opts.limits?.graft$
}))
break
}
case 4: {
if (opts.limits?.prune != null && obj.prune.length === opts.limits.prune) {
throw new CodeError('decode error - map field "prune" had too many elements', 'ERR_MAX_LENGTH')
throw new MaxLengthError('Decode error - map field "prune" had too many elements')
}

obj.prune.push(RPC.ControlPrune.codec().decode(reader, reader.uint32()))
obj.prune.push(RPC.ControlPrune.codec().decode(reader, reader.uint32(), {
limits: opts.limits?.prune$
}))
break
}
default: {
Expand Down Expand Up @@ -356,7 +364,7 @@ export namespace RPC {
}
case 2: {
if (opts.limits?.messageIDs != null && obj.messageIDs.length === opts.limits.messageIDs) {
throw new CodeError('decode error - map field "messageIDs" had too many elements', 'ERR_MAX_LENGTH')
throw new MaxLengthError('Decode error - map field "messageIDs" had too many elements')
}

obj.messageIDs.push(reader.bytes())
Expand Down Expand Up @@ -422,7 +430,7 @@ export namespace RPC {
switch (tag >>> 3) {
case 1: {
if (opts.limits?.messageIDs != null && obj.messageIDs.length === opts.limits.messageIDs) {
throw new CodeError('decode error - map field "messageIDs" had too many elements', 'ERR_MAX_LENGTH')
throw new MaxLengthError('Decode error - map field "messageIDs" had too many elements')
}

obj.messageIDs.push(reader.bytes())
Expand Down Expand Up @@ -562,10 +570,12 @@ export namespace RPC {
}
case 2: {
if (opts.limits?.peers != null && obj.peers.length === opts.limits.peers) {
throw new CodeError('decode error - map field "peers" had too many elements', 'ERR_MAX_LENGTH')
throw new MaxLengthError('Decode error - map field "peers" had too many elements')
}

obj.peers.push(RPC.PeerInfo.codec().decode(reader, reader.uint32()))
obj.peers.push(RPC.PeerInfo.codec().decode(reader, reader.uint32(), {
limits: opts.limits?.peers$
}))
break
}
case 3: {
Expand Down Expand Up @@ -708,22 +718,28 @@ export namespace RPC {
switch (tag >>> 3) {
case 1: {
if (opts.limits?.subscriptions != null && obj.subscriptions.length === opts.limits.subscriptions) {
throw new CodeError('decode error - map field "subscriptions" had too many elements', 'ERR_MAX_LENGTH')
throw new MaxLengthError('Decode error - map field "subscriptions" had too many elements')
}

obj.subscriptions.push(RPC.SubOpts.codec().decode(reader, reader.uint32()))
obj.subscriptions.push(RPC.SubOpts.codec().decode(reader, reader.uint32(), {
limits: opts.limits?.subscriptions$
}))
break
}
case 2: {
if (opts.limits?.messages != null && obj.messages.length === opts.limits.messages) {
throw new CodeError('decode error - map field "messages" had too many elements', 'ERR_MAX_LENGTH')
throw new MaxLengthError('Decode error - map field "messages" had too many elements')
}

obj.messages.push(RPC.Message.codec().decode(reader, reader.uint32()))
obj.messages.push(RPC.Message.codec().decode(reader, reader.uint32(), {
limits: opts.limits?.messages$
}))
break
}
case 3: {
obj.control = RPC.ControlMessage.codec().decode(reader, reader.uint32())
obj.control = RPC.ControlMessage.codec().decode(reader, reader.uint32(), {
limits: opts.limits?.control
})
break
}
default: {
Expand Down
2 changes: 0 additions & 2 deletions src/score/constants.ts

This file was deleted.

Loading
Loading